You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/08/09 09:31:39 UTC

[pulsar] branch branch-2.7 updated: [Cherry-pick] Add backoff for setting for getting topic policies to branch-2.7 (#11574)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 9bd54a9  [Cherry-pick] Add backoff for setting for getting topic policies to branch-2.7 (#11574)
9bd54a9 is described below

commit 9bd54a9397e95543defbcc435f5246c3a682f7ee
Author: GuoJiwei <te...@apache.org>
AuthorDate: Mon Aug 9 17:30:56 2021 +0800

    [Cherry-pick] Add backoff for setting for getting topic policies to branch-2.7 (#11574)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  49 +-
 .../broker/admin/impl/PersistentTopicsBase.java    | 568 +++++++++++----------
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 341 +++++++------
 .../apache/pulsar/client/impl/BackoffBuilder.java  |   3 +-
 4 files changed, 503 insertions(+), 458 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 3e26aa7..665043b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -33,7 +33,10 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import javax.servlet.ServletContext;
 import javax.ws.rs.WebApplicationException;
@@ -51,6 +54,8 @@ import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.internal.TopicsImpl;
+import org.apache.pulsar.client.impl.Backoff;
+import org.apache.pulsar.client.impl.BackoffBuilder;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.naming.Constants;
@@ -95,6 +100,7 @@ public abstract class AdminResource extends PulsarWebResource {
     private static final Logger log = LoggerFactory.getLogger(AdminResource.class);
     private static final String POLICIES_READONLY_FLAG_PATH = "/admin/flags/policies-readonly";
     public static final String PARTITIONED_TOPIC_PATH_ZNODE = "partitioned-topics";
+    private static final long DEFAULT_GET_TOPIC_POLICY_TIMEOUT = 30_000;
 
     protected ZooKeeper globalZk() {
         return pulsar().getGlobalZkCache().getZooKeeper();
@@ -541,19 +547,48 @@ public abstract class AdminResource extends PulsarWebResource {
         return pulsar().getBrokerService().getBacklogQuotaManager().getBacklogQuota(namespace, namespacePath);
     }
 
-    protected Optional<TopicPolicies> getTopicPolicies(TopicName topicName) {
+    protected CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsyncWithRetry(TopicName topicName) {
+        return internalGetTopicPoliciesAsyncWithRetry(topicName,
+                new AtomicLong(DEFAULT_GET_TOPIC_POLICY_TIMEOUT), null, null);
+    }
+
+    protected CompletableFuture<Optional<TopicPolicies>> internalGetTopicPoliciesAsyncWithRetry(TopicName topicName,
+                                                                                                final AtomicLong remainingTime,
+                                                                                                final Backoff backoff,
+                                                                                                CompletableFuture<Optional<TopicPolicies>> future) {
+        CompletableFuture<Optional<TopicPolicies>> response = future == null ? new CompletableFuture<>() : future;
         try {
             checkTopicLevelPolicyEnable();
-            return Optional.ofNullable(pulsar().getTopicPoliciesService().getTopicPolicies(topicName));
+            response.complete(Optional.ofNullable(pulsar()
+                    .getTopicPoliciesService().getTopicPolicies(topicName)));
         } catch (RestException re) {
-            throw re;
-        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e){
-            log.error("Topic {} policies cache have not init.", topicName);
-            throw new RestException(e);
+            response.completeExceptionally(re);
+        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+            Backoff usedBackoff = backoff == null ? new BackoffBuilder()
+                    .setInitialTime(500, TimeUnit.MILLISECONDS)
+                    .setMandatoryStop(DEFAULT_GET_TOPIC_POLICY_TIMEOUT, TimeUnit.MILLISECONDS)
+                    .setMax(DEFAULT_GET_TOPIC_POLICY_TIMEOUT, TimeUnit.MILLISECONDS)
+                    .create() : backoff;
+            long nextDelay = Math.min(usedBackoff.next(), remainingTime.get());
+            if (nextDelay <= 0) {
+                response.completeExceptionally(new TimeoutException(
+                        String.format("Failed to get topic policy withing configured timeout %s ms",
+                                DEFAULT_GET_TOPIC_POLICY_TIMEOUT)));
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.error("Topic {} policies have not been initialized yet, retry after {}ms",
+                            topicName, nextDelay);
+                }
+                pulsar().getExecutor().schedule(() -> {
+                    remainingTime.addAndGet(-nextDelay);
+                    internalGetTopicPoliciesAsyncWithRetry(topicName, remainingTime, usedBackoff, response);
+                }, nextDelay, TimeUnit.MILLISECONDS);
+            }
         } catch (Exception e) {
             log.error("[{}] Failed to get topic policies {}", clientAppId(), topicName, e);
-            throw new RestException(e);
+            response.completeExceptionally(e);
         }
+        return response;
     }
 
     protected boolean checkBacklogQuota(BacklogQuota quota, RetentionPolicies retention) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index d02c9b3..47b42cf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -74,7 +74,6 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.admin.ZkAdminPaths;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
-import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException;
 import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
@@ -588,16 +587,14 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> internalSetDelayedDeliveryPolicies(DelayedDeliveryPolicies deliveryPolicies) {
-        TopicPolicies topicPolicies;
-        try {
-            topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
-            topicPolicies.setDelayedDeliveryEnabled(deliveryPolicies == null ? null : deliveryPolicies.isActive());
-            topicPolicies.setDelayedDeliveryTickTimeMillis(
-                    deliveryPolicies == null ? null : deliveryPolicies.getTickTime());
-        } catch (Exception e) {
-            return FutureUtil.failedFuture(e);
-        }
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                    .thenCompose(op -> {
+                        TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                        topicPolicies.setDelayedDeliveryEnabled(deliveryPolicies == null ? null : deliveryPolicies.isActive());
+                        topicPolicies.setDelayedDeliveryTickTimeMillis(
+                                deliveryPolicies == null ? null : deliveryPolicies.getTickTime());
+                        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+                    });
     }
 
     private CompletableFuture<Void> updatePartitionInOtherCluster(int numPartitions, Set<String> clusters) {
@@ -809,20 +806,12 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> internalSetOffloadPolicies(OffloadPolicies offloadPolicies) {
-        TopicPolicies topicPolicies = null;
-        try {
-            topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
-        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
-            log.error("Topic {} policies cache have not init.", topicName);
-            throw new RestException(Status.PRECONDITION_FAILED, "Policies cache have not init");
-        }
-        if (topicPolicies == null) {
-            topicPolicies = new TopicPolicies();
-        }
-        topicPolicies.setOffloadPolicies(offloadPolicies);
-        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
-        pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
-                .thenCompose((res) -> {
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                    topicPolicies.setOffloadPolicies(offloadPolicies);
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+                }).thenCompose(__ -> {
                     //The policy update is asynchronous. Cache at this step may not be updated yet.
                     //So we need to set the loader by the incoming offloadPolicies instead of topic policies cache.
                     PartitionedTopicMetadata metadata = fetchPartitionedTopicMetadata(pulsar(), topicName);
@@ -835,30 +824,16 @@ public class PersistentTopicsBase extends AdminResource {
                     } else {
                         return internalUpdateOffloadPolicies(offloadPolicies, topicName);
                     }
-                })
-                .whenComplete((result, e) -> {
-                    if (e != null) {
-                        completableFuture.completeExceptionally(e);
-                    } else {
-                        completableFuture.complete(null);
-                    }
-                });
-        return completableFuture;
+            });
     }
 
     protected CompletableFuture<Void> internalSetInactiveTopicPolicies(InactiveTopicPolicies inactiveTopicPolicies) {
-        TopicPolicies topicPolicies = null;
-        try {
-            topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
-        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
-            log.error("Topic {} policies cache have not init.", topicName);
-            return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Policies cache have not init"));
-        }
-        if (topicPolicies == null) {
-            topicPolicies = new TopicPolicies();
-        }
-        topicPolicies.setInactiveTopicPolicies(inactiveTopicPolicies);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                    topicPolicies.setInactiveTopicPolicies(inactiveTopicPolicies);
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+                });
     }
 
     private CompletableFuture<Void> internalUpdateOffloadPolicies(OffloadPolicies offloadPolicies, TopicName topicName) {
@@ -892,19 +867,12 @@ public class PersistentTopicsBase extends AdminResource {
             throw new RestException(Status.PRECONDITION_FAILED,
                     "maxUnackedNum must be 0 or more");
         }
-
-        TopicPolicies topicPolicies = null;
-        try {
-            topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
-        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
-            log.error("Topic {} policies cache have not init.", topicName);
-            throw new RestException(Status.PRECONDITION_FAILED, "Policies cache have not init");
-        }
-        if (topicPolicies == null) {
-            topicPolicies = new TopicPolicies();
-        }
-        topicPolicies.setMaxUnackedMessagesOnSubscription(maxUnackedNum);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                    topicPolicies.setMaxUnackedMessagesOnSubscription(maxUnackedNum);
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+                });
     }
 
     protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnConsumer(Integer maxUnackedNum) {
@@ -912,29 +880,24 @@ public class PersistentTopicsBase extends AdminResource {
             throw new RestException(Status.PRECONDITION_FAILED,
                     "maxUnackedNum must be 0 or more");
         }
-
-        TopicPolicies topicPolicies = null;
-        try {
-            topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
-        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
-            log.error("Topic {} policies cache have not init.", topicName);
-            return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Policies cache have not init"));
-        }
-        if (topicPolicies == null) {
-            topicPolicies = new TopicPolicies();
-        }
-        topicPolicies.setMaxUnackedMessagesOnConsumer(maxUnackedNum);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                    topicPolicies.setMaxUnackedMessagesOnConsumer(maxUnackedNum);
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+                });
     }
 
     protected CompletableFuture<Void> internalSetDeduplicationSnapshotInterval(Integer interval) {
         if (interval != null && interval < 0) {
             throw new RestException(Status.PRECONDITION_FAILED, "interval must be 0 or more");
         }
-        TopicPolicies policies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
-        policies.setDeduplicationSnapshotIntervalSeconds(interval);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, policies);
-
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    TopicPolicies policies = op.orElseGet(TopicPolicies::new);
+                    policies.setDeduplicationSnapshotIntervalSeconds(interval);
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, policies);
+                });
     }
 
     private void internalUnloadNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
@@ -2431,56 +2394,48 @@ public class PersistentTopicsBase extends AdminResource {
     protected CompletableFuture<Void> internalSetBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType,
                                                               BacklogQuota backlogQuota) {
         validatePoliciesReadOnlyAccess();
-        TopicPolicies topicPolicies;
-        if (backlogQuotaType == null) {
-            backlogQuotaType = BacklogQuota.BacklogQuotaType.destination_storage;
-        }
-        try {
-            topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
-        } catch (Exception e) {
-            return FutureUtil.failedFuture(e);
-        }
+        BacklogQuota.BacklogQuotaType finalBacklogQuotaType = backlogQuotaType == null
+                ? BacklogQuota.BacklogQuotaType.destination_storage : backlogQuotaType;
 
-        RetentionPolicies retentionPolicies = getRetentionPolicies(topicName, topicPolicies);
-        if (!checkBacklogQuota(backlogQuota, retentionPolicies)) {
-            log.warn(
-                    "[{}] Failed to update backlog configuration for topic {}: conflicts with retention quota",
-                    clientAppId(), topicName);
-            return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
-                    "Backlog Quota exceeds configured retention quota for topic. "
-                            + "Please increase retention quota and retry"));
-        }
+        return getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> {
 
-        if (backlogQuota != null) {
-            topicPolicies.getBackLogQuotaMap().put(backlogQuotaType.name(), backlogQuota);
-        } else {
-            topicPolicies.getBackLogQuotaMap().remove(backlogQuotaType.name());
-        }
-        Map<String, BacklogQuota> backLogQuotaMap = topicPolicies.getBackLogQuotaMap();
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies).thenRun(() -> {
-            try {
-                log.info("[{}] Successfully updated backlog quota map: namespace={}, topic={}, map={}",
-                        clientAppId(),
-                        namespaceName,
-                        topicName.getLocalName(),
-                        jsonMapper().writeValueAsString(backLogQuotaMap));
-            } catch (JsonProcessingException ignore) { }
+
+            TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+            RetentionPolicies retentionPolicies = getRetentionPolicies(topicName, topicPolicies);
+            if (!checkBacklogQuota(backlogQuota, retentionPolicies)) {
+                log.warn(
+                        "[{}] Failed to update backlog configuration for topic {}: conflicts with retention quota",
+                        clientAppId(), topicName);
+                return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
+                        "Backlog Quota exceeds configured retention quota for topic. "
+                                + "Please increase retention quota and retry"));
+            }
+
+            if (backlogQuota != null) {
+                topicPolicies.getBackLogQuotaMap().put(finalBacklogQuotaType.name(), backlogQuota);
+            } else {
+                topicPolicies.getBackLogQuotaMap().remove(finalBacklogQuotaType.name());
+            }
+            Map<String, BacklogQuota> backLogQuotaMap = topicPolicies.getBackLogQuotaMap();
+            return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies).thenRun(() -> {
+                try {
+                    log.info("[{}] Successfully updated backlog quota map: namespace={}, topic={}, map={}",
+                            clientAppId(),
+                            namespaceName,
+                            topicName.getLocalName(),
+                            jsonMapper().writeValueAsString(backLogQuotaMap));
+                } catch (JsonProcessingException ignore) { }
+            });
         });
     }
 
     protected CompletableFuture<Void> internalSetDeduplicationEnabled(Boolean enabled) {
-        TopicPolicies topicPolicies = null;
-        try {
-            topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
-        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
-            log.error("Topic {} policies cache have not init.", topicName);
-            throw new RestException(Status.PRECONDITION_FAILED, "Policies cache have not init");
-        }
-        if (topicPolicies == null) {
-            topicPolicies = new TopicPolicies();
-        }
-        topicPolicies.setDeduplicationEnabled(enabled);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                    topicPolicies.setDeduplicationEnabled(enabled);
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+                });
     }
 
     protected CompletableFuture<Void> internalSetMessageTTL(Integer ttlInSecond) {
@@ -2489,20 +2444,15 @@ public class PersistentTopicsBase extends AdminResource {
             return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
                     "Invalid value for message TTL"));
         }
-        TopicPolicies topicPolicies;
-        try {
-            topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
-        } catch (Exception e) {
-            return FutureUtil.failedFuture(e);
-        }
-        topicPolicies.setMessageTTLInSeconds(ttlInSecond);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies).thenRun(() -> {
-            log.info("[{}] Successfully set topic message ttl: namespace={}, topic={}, ttl={}",
-                    clientAppId(),
-                    namespaceName,
-                    topicName.getLocalName(),
-                    ttlInSecond);
-        });
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                    topicPolicies.setMessageTTLInSeconds(ttlInSecond);
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
+                            .thenRun(() ->
+                                    log.info("[{}] Successfully set topic message ttl: namespace={}, topic={}, ttl={}",
+                                            clientAppId(), namespaceName, topicName.getLocalName(), ttlInSecond));
+                });
     }
 
 
@@ -2520,63 +2470,73 @@ public class PersistentTopicsBase extends AdminResource {
         return retentionPolicies;
     }
 
-    protected RetentionPolicies internalGetRetention(){
-        return getTopicPolicies(topicName)
-                .map(TopicPolicies::getRetentionPolicies).orElse(new RetentionPolicies());
+    protected CompletableFuture<RetentionPolicies> internalGetRetention(){
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenApply(op -> op.map(TopicPolicies::getRetentionPolicies).orElseGet(RetentionPolicies::new));
     }
 
     protected CompletableFuture<Void> internalSetRetention(RetentionPolicies retention) {
         if (retention == null) {
             return CompletableFuture.completedFuture(null);
         }
-        TopicPolicies topicPolicies = getTopicPolicies(topicName)
-                .orElseGet(TopicPolicies::new);
-        BacklogQuota backlogQuota =
-                    topicPolicies.getBackLogQuotaMap().get(BacklogQuota.BacklogQuotaType.destination_storage.name());
-        if (backlogQuota == null){
-            Policies policies = getNamespacePolicies(topicName.getNamespaceObject());
-            backlogQuota = policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage);
-        }
-        if(!checkBacklogQuota(backlogQuota, retention)){
-            log.warn(
-                    "[{}] Failed to update retention quota configuration for topic {}: conflicts with retention quota",
-                    clientAppId(), topicName);
-            throw new RestException(Status.PRECONDITION_FAILED,
-                    "Retention Quota must exceed configured backlog quota for topic. " +
-                            "Please increase retention quota and retry");
-        }
-        topicPolicies.setRetentionPolicies(retention);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                    BacklogQuota backlogQuota =
+                            topicPolicies.getBackLogQuotaMap().get(BacklogQuota.BacklogQuotaType.destination_storage.name());
+                    if (backlogQuota == null){
+                        Policies policies = getNamespacePolicies(topicName.getNamespaceObject());
+                        backlogQuota = policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage);
+                    }
+                    if(!checkBacklogQuota(backlogQuota, retention)){
+                        log.warn(
+                                "[{}] Failed to update retention quota configuration for topic {}: conflicts with retention quota",
+                                clientAppId(), topicName);
+                        throw new RestException(Status.PRECONDITION_FAILED,
+                                "Retention Quota must exceed configured backlog quota for topic. " +
+                                        "Please increase retention quota and retry");
+                    }
+                    topicPolicies.setRetentionPolicies(retention);
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+                });
     }
 
     protected CompletableFuture<Void> internalRemoveRetention() {
-        Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
-        if (!topicPolicies.isPresent()) {
-            return CompletableFuture.completedFuture(null);
-        }
-        topicPolicies.get().setRetentionPolicies(null);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    if (!op.isPresent()) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    op.get().setRetentionPolicies(null);
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
+                });
     }
 
-    protected Optional<PersistencePolicies> internalGetPersistence(){
-        return getTopicPolicies(topicName).map(TopicPolicies::getPersistence);
+    protected CompletableFuture<Optional<PersistencePolicies>> internalGetPersistence(){
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenApply(op -> op.map(TopicPolicies::getPersistence));
     }
 
     protected CompletableFuture<Void> internalSetPersistence(PersistencePolicies persistencePolicies) {
         validatePersistencePolicies(persistencePolicies);
 
-        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
-        topicPolicies.setPersistence(persistencePolicies);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                    topicPolicies.setPersistence(persistencePolicies);
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+                });
     }
 
     protected CompletableFuture<Void> internalRemovePersistence() {
-        Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
-        if (!topicPolicies.isPresent()) {
-            return CompletableFuture.completedFuture(null);
-        }
-        topicPolicies.get().setPersistence(null);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    if (!op.isPresent()) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    op.get().setPersistence(null);
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
+                });
     }
 
     protected CompletableFuture<Void> internalSetMaxMessageSize(Integer maxMessageSize) {
@@ -2585,18 +2545,22 @@ public class PersistentTopicsBase extends AdminResource {
                     , "topic-level maxMessageSize must be greater than or equal to 0 " +
                     "and must be smaller than that in the broker-level");
         }
-
-        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
-        topicPolicies.setMaxMessageSize(maxMessageSize);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                    topicPolicies.setMaxMessageSize(maxMessageSize);
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+                });
     }
 
-    protected Optional<Integer> internalGetMaxMessageSize() {
-        return getTopicPolicies(topicName).map(TopicPolicies::getMaxMessageSize);
+    protected CompletableFuture<Optional<Integer>> internalGetMaxMessageSize() {
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenApply(op -> op.map(TopicPolicies::getMaxMessageSize));
     }
 
-    protected Optional<Integer> internalGetMaxProducers() {
-        return getTopicPolicies(topicName).map(TopicPolicies::getMaxProducerPerTopic);
+    protected CompletableFuture<Optional<Integer>> internalGetMaxProducers() {
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenApply(op -> op.map(TopicPolicies::getMaxProducerPerTopic));
     }
 
     protected CompletableFuture<Void> internalSetMaxProducers(Integer maxProducers) {
@@ -2604,13 +2568,17 @@ public class PersistentTopicsBase extends AdminResource {
             throw new RestException(Status.PRECONDITION_FAILED,
                     "maxProducers must be 0 or more");
         }
-        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
-        topicPolicies.setMaxProducerPerTopic(maxProducers);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                    topicPolicies.setMaxProducerPerTopic(maxProducers);
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+                });
     }
 
-    protected Optional<Integer> internalGetMaxSubscriptionsPerTopic() {
-        return getTopicPolicies(topicName).map(TopicPolicies::getMaxSubscriptionsPerTopic);
+    protected CompletableFuture<Optional<Integer>> internalGetMaxSubscriptionsPerTopic() {
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenApply(op -> op.map(TopicPolicies::getMaxSubscriptionsPerTopic));
     }
 
     protected CompletableFuture<Void> internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic) {
@@ -2618,10 +2586,12 @@ public class PersistentTopicsBase extends AdminResource {
             throw new RestException(Status.PRECONDITION_FAILED,
                     "maxSubscriptionsPerTopic must be 0 or more");
         }
-
-        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
-        topicPolicies.setMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                    topicPolicies.setMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic);
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+                });
     }
 
     protected CompletableFuture<Void> preValidation(boolean authoritative) {
@@ -2651,16 +2621,19 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> internalRemoveMaxProducers() {
-        Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
-        if (!topicPolicies.isPresent()) {
-            return CompletableFuture.completedFuture(null);
-        }
-        topicPolicies.get().setMaxProducerPerTopic(null);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    if (!op.isPresent()) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    op.get().setMaxProducerPerTopic(null);
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
+                });
     }
 
-    protected Optional<Integer> internalGetMaxConsumers() {
-        return getTopicPolicies(topicName).map(TopicPolicies::getMaxConsumerPerTopic);
+    protected CompletableFuture<Optional<Integer>> internalGetMaxConsumers() {
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenApply(op -> op.map(TopicPolicies::getMaxConsumerPerTopic));
     }
 
     protected CompletableFuture<Void> internalSetMaxConsumers(Integer maxConsumers) {
@@ -2668,19 +2641,23 @@ public class PersistentTopicsBase extends AdminResource {
             throw new RestException(Status.PRECONDITION_FAILED,
                     "maxConsumers must be 0 or more");
         }
-
-        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
-        topicPolicies.setMaxConsumerPerTopic(maxConsumers);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                    topicPolicies.setMaxConsumerPerTopic(maxConsumers);
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+                });
     }
 
     protected CompletableFuture<Void> internalRemoveMaxConsumers() {
-        Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
-        if (!topicPolicies.isPresent()) {
-            return CompletableFuture.completedFuture(null);
-        }
-        topicPolicies.get().setMaxConsumerPerTopic(null);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    if (!op.isPresent()) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    op.get().setMaxConsumerPerTopic(null);
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
+                });
     }
 
     protected MessageId internalTerminate(boolean authoritative) {
@@ -3478,104 +3455,122 @@ public class PersistentTopicsBase extends AdminResource {
 
     }
 
-    protected Optional<DispatchRate> internalGetDispatchRate() {
-        return getTopicPolicies(topicName).map(TopicPolicies::getDispatchRate);
+    protected CompletableFuture<Optional<DispatchRate>> internalGetDispatchRate() {
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenApply(op -> op.map(TopicPolicies::getDispatchRate));
     }
 
     protected CompletableFuture<Void> internalSetDispatchRate(DispatchRate dispatchRate) {
         if (dispatchRate == null) {
             return CompletableFuture.completedFuture(null);
         }
-        TopicPolicies topicPolicies = getTopicPolicies(topicName)
-            .orElseGet(TopicPolicies::new);
-        topicPolicies.setDispatchRate(dispatchRate);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                    topicPolicies.setDispatchRate(dispatchRate);
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+                });
     }
 
     protected CompletableFuture<Void> internalRemoveDispatchRate() {
-        Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
-        if (!topicPolicies.isPresent()) {
-            return CompletableFuture.completedFuture(null);
-        }
-        topicPolicies.get().setDispatchRate(null);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
-
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    if (!op.isPresent()) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    op.get().setDispatchRate(null);
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
+                });
     }
 
-    protected Optional<DispatchRate> internalGetSubscriptionDispatchRate() {
-        return getTopicPolicies(topicName).map(TopicPolicies::getSubscriptionDispatchRate);
+    protected CompletableFuture<Optional<DispatchRate>> internalGetSubscriptionDispatchRate() {
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenApply(op -> op.map(TopicPolicies::getSubscriptionDispatchRate));
     }
 
     protected CompletableFuture<Void> internalSetSubscriptionDispatchRate(DispatchRate dispatchRate) {
         if (dispatchRate == null) {
             return CompletableFuture.completedFuture(null);
         }
-        TopicPolicies topicPolicies = getTopicPolicies(topicName)
-            .orElseGet(TopicPolicies::new);
-        topicPolicies.setSubscriptionDispatchRate(dispatchRate);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                    topicPolicies.setSubscriptionDispatchRate(dispatchRate);
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+                });
     }
 
     protected CompletableFuture<Void> internalRemoveSubscriptionDispatchRate() {
-        Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
-        if (!topicPolicies.isPresent()) {
-            return CompletableFuture.completedFuture(null);
-        }
-        topicPolicies.get().setSubscriptionDispatchRate(null);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    if (!op.isPresent()) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    op.get().setSubscriptionDispatchRate(null);
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
+                });
     }
 
 
-    protected Optional<Integer> internalGetMaxConsumersPerSubscription() {
-        return getTopicPolicies(topicName).map(TopicPolicies::getMaxConsumersPerSubscription);
+    protected CompletableFuture<Optional<Integer>> internalGetMaxConsumersPerSubscription() {
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenApply(op -> op.map(TopicPolicies::getMaxConsumersPerSubscription));
     }
 
     protected CompletableFuture<Void> internalSetMaxConsumersPerSubscription(Integer maxConsumersPerSubscription) {
         if (maxConsumersPerSubscription != null && maxConsumersPerSubscription < 0) {
             throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for maxConsumersPerSubscription");
         }
-
-        TopicPolicies topicPolicies = getTopicPolicies(topicName)
-                .orElseGet(TopicPolicies::new);
-        topicPolicies.setMaxConsumersPerSubscription(maxConsumersPerSubscription);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                    topicPolicies.setMaxConsumersPerSubscription(maxConsumersPerSubscription);
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+                });
     }
 
     protected CompletableFuture<Void> internalRemoveMaxConsumersPerSubscription() {
-        Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
-        if (!topicPolicies.isPresent()) {
-            return CompletableFuture.completedFuture(null);
-        }
-        topicPolicies.get().setMaxConsumersPerSubscription(null);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    if (!op.isPresent()) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    op.get().setMaxConsumersPerSubscription(null);
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
+                });
     }
 
-    protected Optional<Long> internalGetCompactionThreshold() {
-        return getTopicPolicies(topicName).map(TopicPolicies::getCompactionThreshold);
+    protected CompletableFuture<Optional<Long>> internalGetCompactionThreshold() {
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenApply(op -> op.map(TopicPolicies::getCompactionThreshold));
     }
 
     protected CompletableFuture<Void> internalSetCompactionThreshold(Long compactionThreshold) {
         if (compactionThreshold != null && compactionThreshold < 0) {
             throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for compactionThreshold");
         }
-
-        TopicPolicies topicPolicies = getTopicPolicies(topicName)
-            .orElseGet(TopicPolicies::new);
-        topicPolicies.setCompactionThreshold(compactionThreshold);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                    topicPolicies.setCompactionThreshold(compactionThreshold);
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+                });
     }
 
     protected CompletableFuture<Void> internalRemoveCompactionThreshold() {
-        Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
-        if (!topicPolicies.isPresent()) {
-          return CompletableFuture.completedFuture(null);
-        }
-        topicPolicies.get().setCompactionThreshold(null);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    if (!op.isPresent()) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    op.get().setCompactionThreshold(null);
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
+                });
     }
 
-    protected Optional<PublishRate> internalGetPublishRate() {
-        return getTopicPolicies(topicName).map(TopicPolicies::getPublishRate);
+    protected CompletableFuture<Optional<PublishRate>> internalGetPublishRate() {
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenApply(op -> op.map(TopicPolicies::getPublishRate));
 
     }
 
@@ -3583,42 +3578,51 @@ public class PersistentTopicsBase extends AdminResource {
         if (publishRate == null) {
             return CompletableFuture.completedFuture(null);
         }
-        TopicPolicies topicPolicies = getTopicPolicies(topicName)
-            .orElseGet(TopicPolicies::new);
-        topicPolicies.setPublishRate(publishRate);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                    topicPolicies.setPublishRate(publishRate);
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+                });
     }
 
     protected CompletableFuture<Void> internalRemovePublishRate() {
-        Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
-        if (!topicPolicies.isPresent()) {
-            return CompletableFuture.completedFuture(null);
-        }
-        topicPolicies.get().setPublishRate(null);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    if (!op.isPresent()) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    op.get().setPublishRate(null);
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
+                });
     }
 
-    protected Optional<SubscribeRate> internalGetSubscribeRate() {
-        return getTopicPolicies(topicName).map(TopicPolicies::getSubscribeRate);
+    protected CompletableFuture<Optional<SubscribeRate>> internalGetSubscribeRate() {
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenApply(op -> op.map(TopicPolicies::getSubscribeRate));
     }
 
     protected CompletableFuture<Void> internalSetSubscribeRate(SubscribeRate subscribeRate) {
         if (subscribeRate == null) {
             return CompletableFuture.completedFuture(null);
         }
-        TopicPolicies topicPolicies = getTopicPolicies(topicName)
-                .orElseGet(TopicPolicies::new);
-        topicPolicies.setSubscribeRate(subscribeRate);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                    topicPolicies.setSubscribeRate(subscribeRate);
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+                });
     }
 
     protected CompletableFuture<Void> internalRemoveSubscribeRate() {
-        Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
-        if (!topicPolicies.isPresent()) {
-            return CompletableFuture.completedFuture(null);
-        }
-        topicPolicies.get().setSubscribeRate(null);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
+        return getTopicPoliciesAsyncWithRetry(topicName)
+                .thenCompose(op -> {
+                    if (!op.isPresent()) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    op.get().setSubscribeRate(null);
+                    return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
+                });
     }
 
     protected void handleTopicPolicyException(String methodName, Throwable thr, AsyncResponse asyncResponse) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 9267c28..a108adb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -32,7 +32,6 @@ import io.swagger.annotations.ApiResponses;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
@@ -269,14 +268,15 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenRun(() -> {
-                TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
-                if (topicPolicies.isOffloadPoliciesSet()) {
-                    asyncResponse.resume(topicPolicies.getOffloadPolicies());
-                } else {
-                    asyncResponse.resume(Response.noContent().build());
-                }
-            })
+            .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+                .thenApply(op -> op.orElseGet(TopicPolicies::new))
+                .thenAccept(topicPolicies -> {
+                    if (topicPolicies.isOffloadPoliciesSet()) {
+                        asyncResponse.resume(topicPolicies.getOffloadPolicies());
+                    } else {
+                        asyncResponse.resume(Response.noContent().build());
+                    }
+                })
             .exceptionally(ex -> {
                 handleTopicPolicyException("getOffloadPolicies", ex, asyncResponse);
                 return null;
@@ -342,14 +342,15 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenRun(() -> {
-                TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
-                if (topicPolicies.isMaxUnackedMessagesOnConsumerSet()) {
-                    asyncResponse.resume(topicPolicies.getMaxUnackedMessagesOnConsumer());
-                } else {
-                    asyncResponse.resume(Response.noContent().build());
-                }
-            })
+            .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+                .thenApply(op -> op.orElseGet(TopicPolicies::new))
+                .thenAccept(topicPolicies -> {
+                    if (topicPolicies.isMaxUnackedMessagesOnConsumerSet()) {
+                        asyncResponse.resume(topicPolicies.getMaxUnackedMessagesOnConsumer());
+                    } else {
+                        asyncResponse.resume(Response.noContent().build());
+                    }
+                })
             .exceptionally(ex -> {
                 handleTopicPolicyException("getMaxUnackedMessagesOnConsumer", ex, asyncResponse);
                 return null;
@@ -414,14 +415,15 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenRun(() -> {
-                TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
-                if (topicPolicies.isDeduplicationSnapshotIntervalSecondsSet()) {
-                    asyncResponse.resume(topicPolicies.getDeduplicationSnapshotIntervalSeconds());
-                } else {
-                    asyncResponse.resume(Response.noContent().build());
-                }
-            })
+            .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+                .thenApply(op -> op.orElseGet(TopicPolicies::new))
+                .thenAccept(topicPolicies -> {
+                    if (topicPolicies.isDeduplicationSnapshotIntervalSecondsSet()) {
+                        asyncResponse.resume(topicPolicies.getDeduplicationSnapshotIntervalSeconds());
+                    } else {
+                        asyncResponse.resume(Response.noContent().build());
+                    }
+                })
             .exceptionally(ex -> {
                 handleTopicPolicyException("getDeduplicationSnapshotInterval", ex, asyncResponse);
                 return null;
@@ -486,14 +488,15 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenRun(() -> {
-                TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
-                if (topicPolicies.isInactiveTopicPoliciesSet()) {
-                    asyncResponse.resume(topicPolicies.getInactiveTopicPolicies());
-                } else {
-                    asyncResponse.resume(Response.noContent().build());
-                }
-            })
+            .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+                .thenApply(op -> op.orElseGet(TopicPolicies::new))
+                .thenAccept(topicPolicies -> {
+                    if (topicPolicies.isInactiveTopicPoliciesSet()) {
+                        asyncResponse.resume(topicPolicies.getInactiveTopicPolicies());
+                    } else {
+                        asyncResponse.resume(Response.noContent().build());
+                    }
+                })
             .exceptionally(ex -> {
                 handleTopicPolicyException("getInactiveTopicPolicies", ex, asyncResponse);
                 return null;
@@ -558,14 +561,15 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenRun(() -> {
-                TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
-                if (topicPolicies.isMaxUnackedMessagesOnSubscriptionSet()) {
-                    asyncResponse.resume(topicPolicies.getMaxUnackedMessagesOnSubscription());
-                } else {
-                    asyncResponse.resume(Response.noContent().build());
-                }
-            })
+            .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+                .thenApply(op -> op.orElseGet(TopicPolicies::new))
+                .thenAccept(topicPolicies -> {
+                    if (topicPolicies.isMaxUnackedMessagesOnSubscriptionSet()) {
+                        asyncResponse.resume(topicPolicies.getMaxUnackedMessagesOnSubscription());
+                    } else {
+                        asyncResponse.resume(Response.noContent().build());
+                    }
+                })
             .exceptionally(ex -> {
                 handleTopicPolicyException("getInactiveTopicPolicies", ex, asyncResponse);
                 return null;
@@ -633,15 +637,16 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenRun(() -> {
-                TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
-                if (topicPolicies.isDelayedDeliveryEnabledSet() && topicPolicies.isDelayedDeliveryTickTimeMillisSet()) {
-                    asyncResponse.resume(new DelayedDeliveryPolicies(topicPolicies.getDelayedDeliveryTickTimeMillis()
-                            , topicPolicies.getDelayedDeliveryEnabled()));
-                } else {
-                    asyncResponse.resume(Response.noContent().build());
-                }
-            })
+            .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+                .thenApply(op -> op.orElseGet(TopicPolicies::new))
+                .thenAccept(topicPolicies -> {
+                    if (topicPolicies.isDelayedDeliveryEnabledSet() && topicPolicies.isDelayedDeliveryTickTimeMillisSet()) {
+                        asyncResponse.resume(new DelayedDeliveryPolicies(topicPolicies.getDelayedDeliveryTickTimeMillis()
+                                , topicPolicies.getDelayedDeliveryEnabled()));
+                    } else {
+                        asyncResponse.resume(Response.noContent().build());
+                    }
+                })
             .exceptionally(ex -> {
                 handleTopicPolicyException("getDelayedDeliveryPolicies", ex, asyncResponse);
                 return null;
@@ -1539,14 +1544,14 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenRun(() -> asyncResponse.resume(getTopicPolicies(topicName)
-                .map(TopicPolicies::getBackLogQuotaMap)
+             .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+             .thenApply(op -> op.map(TopicPolicies::getBackLogQuotaMap)
                 .map(map -> {
                     HashMap<BacklogQuota.BacklogQuotaType, BacklogQuota> hashMap = Maps.newHashMap();
                     map.forEach((key,value) -> hashMap.put(BacklogQuota.BacklogQuotaType.valueOf(key),value));
                     return hashMap;
-                })
-                .orElse(Maps.newHashMap())))
+                }).orElse(Maps.newHashMap()))
+              .thenAccept(r -> asyncResponse.resume(r))
             .exceptionally(ex -> {
                 handleTopicPolicyException("getBacklogQuotaMap", ex, asyncResponse);
                 return null;
@@ -1616,9 +1621,9 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenRun(() -> asyncResponse.resume(getTopicPolicies(topicName)
-                .map(TopicPolicies::getMessageTTLInSeconds)
-                .orElse(0)))
+            .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+            .thenApply(op -> op.map(TopicPolicies::getMessageTTLInSeconds).orElse(0))
+            .thenAccept(r -> asyncResponse.resume(r))
             .exceptionally(ex -> {
                 handleTopicPolicyException("getMessageTTL", ex, asyncResponse);
                 return null;
@@ -1687,14 +1692,15 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenRun(() -> {
-                TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
-                if (topicPolicies.isDeduplicationSet()) {
-                    asyncResponse.resume(topicPolicies.getDeduplicationEnabled());
-                } else {
-                    asyncResponse.resume(Response.noContent().build());
-                }
-            })
+            .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+                .thenApply(op -> op.orElseGet(TopicPolicies::new))
+                .thenAccept(topicPolicies -> {
+                    if (topicPolicies.isDeduplicationSet()) {
+                        asyncResponse.resume(topicPolicies.getDeduplicationEnabled());
+                    } else {
+                        asyncResponse.resume(Response.noContent().build());
+                    }
+                })
             .exceptionally(ex -> {
                 handleTopicPolicyException("getDeduplicationEnabled", ex, asyncResponse);
                 return null;
@@ -1762,17 +1768,18 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenRun(() -> {
-                TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
-                if (topicPolicies.isRetentionSet()) {
-                    asyncResponse.resume(topicPolicies.getRetentionPolicies());
-                } else {
-                    asyncResponse.resume(Response.noContent().build());
-                }
-            })
-            .exceptionally(ex -> {
-                handleTopicPolicyException("getRetention", ex, asyncResponse);
-                return null;
+            .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+                .thenApply(op -> op.orElseGet(TopicPolicies::new))
+                .thenAccept(topicPolicies -> {
+                    if (topicPolicies.isRetentionSet()) {
+                        asyncResponse.resume(topicPolicies.getRetentionPolicies());
+                    } else {
+                        asyncResponse.resume(Response.noContent().build());
+                    }
+                })
+                .exceptionally(ex -> {
+                    handleTopicPolicyException("getRetention", ex, asyncResponse);
+                    return null;
             });
     }
 
@@ -1856,17 +1863,17 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenRun(() -> {
-                Optional<PersistencePolicies> persistencePolicies = internalGetPersistence();
-                if (!persistencePolicies.isPresent()) {
-                    asyncResponse.resume(Response.noContent().build());
-                } else {
-                    asyncResponse.resume(persistencePolicies.get());
-                }
-            })
-            .exceptionally(ex -> {
-                handleTopicPolicyException("getPersistence", ex, asyncResponse);
-                return null;
+                .thenCompose(__ -> internalGetPersistence())
+                .thenAccept(persistencePolicies -> {
+                    if (!persistencePolicies.isPresent()) {
+                        asyncResponse.resume(Response.noContent().build());
+                    } else {
+                        asyncResponse.resume(persistencePolicies.get());
+                    }
+                })
+                .exceptionally(ex -> {
+                    handleTopicPolicyException("getPersistence", ex, asyncResponse);
+                    return null;
             });
     }
 
@@ -1951,14 +1958,14 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenRun(() -> {
-                Optional<Integer> maxSubscriptionsPerTopic = internalGetMaxSubscriptionsPerTopic();
-                if (!maxSubscriptionsPerTopic.isPresent()) {
-                    asyncResponse.resume(Response.noContent().build());
-                } else {
-                    asyncResponse.resume(maxSubscriptionsPerTopic.get());
-                }
-            })
+            .thenCompose(__ -> internalGetMaxSubscriptionsPerTopic())
+                .thenAccept(maxSubscriptionsPerTopic -> {
+                    if (!maxSubscriptionsPerTopic.isPresent()) {
+                        asyncResponse.resume(Response.noContent().build());
+                    } else {
+                        asyncResponse.resume(maxSubscriptionsPerTopic.get());
+                    }
+                })
             .exceptionally(ex -> {
                 handleTopicPolicyException("getMaxSubscriptionsPerTopic", ex, asyncResponse);
                 return null;
@@ -2039,14 +2046,14 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenRun(() -> {
-                Optional<Integer> maxProducers = internalGetMaxProducers();
-                if (!maxProducers.isPresent()) {
-                    asyncResponse.resume(Response.noContent().build());
-                } else {
-                    asyncResponse.resume(maxProducers.get());
-                }
-            })
+            .thenCompose(__ -> internalGetMaxProducers())
+                .thenAccept(maxProducers -> {
+                    if (!maxProducers.isPresent()) {
+                        asyncResponse.resume(Response.noContent().build());
+                    } else {
+                        asyncResponse.resume(maxProducers.get());
+                    }
+                })
             .exceptionally(ex -> {
                 handleTopicPolicyException("getMaxProducers", ex, asyncResponse);
                 return null;
@@ -2129,14 +2136,14 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenRun(() -> {
-                Optional<Integer> maxConsumers = internalGetMaxConsumers();
-                if (!maxConsumers.isPresent()) {
-                    asyncResponse.resume(Response.noContent().build());
-                } else {
-                    asyncResponse.resume(maxConsumers.get());
-                }
-            })
+            .thenCompose(__ -> internalGetMaxConsumers())
+                .thenAccept(maxConsumers -> {
+                    if (!maxConsumers.isPresent()) {
+                        asyncResponse.resume(Response.noContent().build());
+                    } else {
+                        asyncResponse.resume(maxConsumers.get());
+                    }
+                })
             .exceptionally(ex -> {
                 handleTopicPolicyException("getMaxConsumers", ex, asyncResponse);
                 return null;
@@ -2219,14 +2226,14 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenRun(() -> {
-                Optional<Integer> policies = internalGetMaxMessageSize();
-                if (policies.isPresent()) {
-                    asyncResponse.resume(policies.get());
-                } else {
-                    asyncResponse.resume(Response.noContent().build());
-                }
-            })
+            .thenCompose(__ -> internalGetMaxMessageSize())
+                .thenAccept(policies -> {
+                    if (policies.isPresent()) {
+                        asyncResponse.resume(policies.get());
+                    } else {
+                        asyncResponse.resume(Response.noContent().build());
+                    }
+                })
             .exceptionally(ex -> {
                 handleTopicPolicyException("getMaxMessageSize", ex, asyncResponse);
                 return null;
@@ -2512,14 +2519,14 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenRun(() -> {
-                Optional<DispatchRate> dispatchRate = internalGetDispatchRate();
-                if (!dispatchRate.isPresent()) {
-                    asyncResponse.resume(Response.noContent().build());
-                } else {
-                    asyncResponse.resume(dispatchRate.get());
-                }
-            })
+            .thenCompose(__ -> internalGetDispatchRate())
+                .thenAccept(dispatchRate -> {
+                    if (!dispatchRate.isPresent()) {
+                        asyncResponse.resume(Response.noContent().build());
+                    } else {
+                        asyncResponse.resume(dispatchRate.get());
+                    }
+                })
             .exceptionally(ex -> {
                 handleTopicPolicyException("getDispatchRate", ex, asyncResponse);
                 return null;
@@ -2606,14 +2613,14 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenRun(() -> {
-                Optional<DispatchRate> dispatchRate = internalGetSubscriptionDispatchRate();
-                if (!dispatchRate.isPresent()) {
-                    asyncResponse.resume(Response.noContent().build());
-                } else {
-                    asyncResponse.resume(dispatchRate.get());
-                }
-            })
+            .thenCompose(__ -> internalGetSubscriptionDispatchRate())
+                .thenAccept(dispatchRate -> {
+                    if (!dispatchRate.isPresent()) {
+                        asyncResponse.resume(Response.noContent().build());
+                    } else {
+                        asyncResponse.resume(dispatchRate.get());
+                    }
+                })
             .exceptionally(ex -> {
                 handleTopicPolicyException("getSubscriptionDispatchRate", ex, asyncResponse);
                 return null;
@@ -2700,14 +2707,14 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenRun(() -> {
-                Optional<Long> compactionThreshold = internalGetCompactionThreshold();
-                if (!compactionThreshold.isPresent()) {
-                    asyncResponse.resume(Response.noContent().build());
-                } else {
-                    asyncResponse.resume(compactionThreshold.get());
-                }
-            })
+            .thenCompose(__ -> internalGetCompactionThreshold())
+                .thenAccept(compactionThreshold -> {
+                    if (!compactionThreshold.isPresent()) {
+                        asyncResponse.resume(Response.noContent().build());
+                    } else {
+                        asyncResponse.resume(compactionThreshold.get());
+                    }
+                })
             .exceptionally(ex -> {
                 handleTopicPolicyException("getCompactionThreshold", ex, asyncResponse);
                 return null;
@@ -2794,14 +2801,14 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenRun(() -> {
-                Optional<Integer> maxConsumersPerSubscription = internalGetMaxConsumersPerSubscription();
-                if (!maxConsumersPerSubscription.isPresent()) {
-                    asyncResponse.resume(Response.noContent().build());
-                } else {
-                    asyncResponse.resume(maxConsumersPerSubscription.get());
-                }
-            })
+            .thenCompose(__ -> internalGetMaxConsumersPerSubscription())
+                .thenAccept(maxConsumersPerSubscription -> {
+                    if (!maxConsumersPerSubscription.isPresent()) {
+                        asyncResponse.resume(Response.noContent().build());
+                    } else {
+                        asyncResponse.resume(maxConsumersPerSubscription.get());
+                    }
+                })
             .exceptionally(ex -> {
                 handleTopicPolicyException("getMaxConsumersPerSubscription", ex, asyncResponse);
                 return null;
@@ -2889,14 +2896,14 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenRun(() -> {
-                Optional<PublishRate> publishRate = internalGetPublishRate();
-                if (!publishRate.isPresent()) {
-                    asyncResponse.resume(Response.noContent().build());
-                } else {
-                    asyncResponse.resume(publishRate.get());
-                }
-            })
+            .thenCompose(__ -> internalGetPublishRate())
+                .thenAccept(publishRate -> {
+                    if (!publishRate.isPresent()) {
+                        asyncResponse.resume(Response.noContent().build());
+                    } else {
+                        asyncResponse.resume(publishRate.get());
+                    }
+                })
             .exceptionally(ex -> {
                 handleTopicPolicyException("getPublishRate", ex, asyncResponse);
                 return null;
@@ -2983,14 +2990,14 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenRun(() -> {
-                Optional<SubscribeRate> subscribeRate = internalGetSubscribeRate();
-                if (!subscribeRate.isPresent()) {
-                    asyncResponse.resume(Response.noContent().build());
-                } else {
-                    asyncResponse.resume(subscribeRate.get());
-                }
-            })
+            .thenCompose(__ -> internalGetSubscribeRate())
+                .thenAccept(subscribeRate -> {
+                    if (!subscribeRate.isPresent()) {
+                        asyncResponse.resume(Response.noContent().build());
+                    } else {
+                        asyncResponse.resume(subscribeRate.get());
+                    }
+                })
             .exceptionally(ex -> {
                 handleTopicPolicyException("getSubscribeRate", ex, asyncResponse);
                 return null;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java
index a1c7614..e6b21cc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java
@@ -34,8 +34,7 @@ public class BackoffBuilder {
     private long mandatoryStop;
     private TimeUnit unitMandatoryStop;
     
-    @VisibleForTesting
-    BackoffBuilder() {
+    public BackoffBuilder() {
         this.initial = 0;
         this.max = 0;
         this.mandatoryStop = 0;