You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/11/22 01:58:24 UTC

[pulsar] branch master updated: [Issue 12726][broker] Fix deadlock in metadata-store callback thread (#12753)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ebfcd71  [Issue 12726][broker] Fix deadlock in metadata-store callback thread (#12753)
ebfcd71 is described below

commit ebfcd713bc07b213cbba992192b9a1f5cc35d971
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Mon Nov 22 09:57:12 2021 +0800

    [Issue 12726][broker] Fix deadlock in metadata-store callback thread (#12753)
    
    Fixes #12726
    
    ### Motivation
    
    See #12726
    
    
    ### Modifications
    
    Use PolicyHierarchyValue to cache backlogQuota policies to avoid calling blocking metadata query in metadata-callback thread.
---
 .../pulsar/broker/service/AbstractTopic.java       | 42 ++++++++--
 .../pulsar/broker/service/BacklogQuotaManager.java | 39 +--------
 .../service/nonpersistent/NonPersistentTopic.java  |  7 +-
 .../broker/service/persistent/PersistentTopic.java | 31 ++++---
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 96 ++++++++++++++--------
 .../service/persistent/MessageDuplicationTest.java | 38 +++++----
 .../policies/data/HierarchyTopicPolicies.java      |  8 ++
 7 files changed, 149 insertions(+), 112 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index d505971..26c591d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -37,6 +38,7 @@ import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import lombok.Getter;
 import org.apache.bookkeeper.mledger.util.StatsBuckets;
+import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.resourcegroup.ResourceGroup;
@@ -51,6 +53,7 @@ import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaExce
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -140,13 +143,8 @@ public abstract class AbstractTopic implements Topic {
         ServiceConfiguration config = brokerService.pulsar().getConfiguration();
         this.replicatorPrefix = config.getReplicatorPrefix();
 
-
         topicPolicies = new HierarchyTopicPolicies();
-        topicPolicies.getInactiveTopicPolicies().updateBrokerValue(new InactiveTopicPolicies(
-                config.getBrokerDeleteInactiveTopicsMode(),
-                config.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(),
-                config.isBrokerDeleteInactiveTopicsEnabled()));
-        topicPolicies.getMaxSubscriptionsPerTopic().updateBrokerValue(config.getMaxSubscriptionsPerTopic());
+        updateTopicPolicyByBrokerConfig(topicPolicies, brokerService);
 
         this.topicMaxMessageSizeCheckIntervalMs = TimeUnit.SECONDS.toMillis(
                 config.getMaxMessageSizeCheckIntervalInSeconds());
@@ -156,6 +154,38 @@ public abstract class AbstractTopic implements Topic {
         updatePublishDispatcher(Optional.empty());
     }
 
+    protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}]updateTopicPolicyByNamespacePolicy,data={}", topic, namespacePolicies);
+        }
+        if (namespacePolicies.deleted) {
+            return;
+        }
+        topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(namespacePolicies.max_subscriptions_per_topic);
+        topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(namespacePolicies.inactive_topic_policies);
+        Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(
+                type -> this.topicPolicies.getBackLogQuotaMap().get(type)
+                        .updateNamespaceValue(MapUtils.getObject(namespacePolicies.backlog_quota_map, type)));
+    }
+
+    private void updateTopicPolicyByBrokerConfig(HierarchyTopicPolicies topicPolicies, BrokerService brokerService) {
+        ServiceConfiguration config = brokerService.pulsar().getConfiguration();
+        topicPolicies.getInactiveTopicPolicies().updateBrokerValue(new InactiveTopicPolicies(
+                config.getBrokerDeleteInactiveTopicsMode(),
+                config.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(),
+                config.isBrokerDeleteInactiveTopicsEnabled()));
+
+        topicPolicies.getMaxSubscriptionsPerTopic().updateBrokerValue(config.getMaxSubscriptionsPerTopic());
+
+        //init backlogQuota
+        topicPolicies.getBackLogQuotaMap()
+                .get(BacklogQuota.BacklogQuotaType.destination_storage)
+                .updateBrokerValue(brokerService.getBacklogQuotaManager().getDefaultQuota());
+        topicPolicies.getBackLogQuotaMap()
+                .get(BacklogQuota.BacklogQuotaType.message_age)
+                .updateBrokerValue(brokerService.getBacklogQuotaManager().getDefaultQuota());
+    }
+
     protected boolean isProducersExceeded() {
         Integer maxProducers = getTopicPolicies().map(TopicPolicies::getMaxProducerPerTopic).orElse(null);
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index 042f9ff..2df198c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.service;
 import com.google.common.collect.Lists;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -33,23 +32,17 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.resources.NamespaceResources;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
-import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 
 @Slf4j
 public class BacklogQuotaManager {
     private final BacklogQuotaImpl defaultQuota;
-    private final PulsarService pulsar;
-    private final boolean isTopicLevelPoliciesEnable;
     private final NamespaceResources namespaceResources;
 
-
     public BacklogQuotaManager(PulsarService pulsar) {
-        this.isTopicLevelPoliciesEnable = pulsar.getConfiguration().isTopicLevelPoliciesEnabled();
         double backlogQuotaGB = pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB();
         this.defaultQuota = BacklogQuotaImpl.builder()
                 .limitSize(backlogQuotaGB > 0 ? (long) (backlogQuotaGB * BacklogQuotaImpl.BYTES_IN_GIGABYTE)
@@ -58,7 +51,6 @@ public class BacklogQuotaManager {
                 .retentionPolicy(pulsar.getConfiguration().getBacklogQuotaDefaultRetentionPolicy())
                 .build();
         this.namespaceResources = pulsar.getPulsarResources().getNamespaceResources();
-        this.pulsar = pulsar;
     }
 
     public BacklogQuotaImpl getDefaultQuota() {
@@ -77,34 +69,6 @@ public class BacklogQuotaManager {
         }
     }
 
-    public BacklogQuotaImpl getBacklogQuota(TopicName topicName, BacklogQuotaType backlogQuotaType) {
-        if (!isTopicLevelPoliciesEnable) {
-            return getBacklogQuota(topicName.getNamespaceObject(), backlogQuotaType);
-        }
-
-        try {
-            return Optional.ofNullable(pulsar.getTopicPoliciesService().getTopicPolicies(topicName))
-                    .map(TopicPolicies::getBackLogQuotaMap)
-                    .map(map -> map.get(backlogQuotaType.name()))
-                    .orElseGet(() -> getBacklogQuota(topicName.getNamespaceObject(), backlogQuotaType));
-        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
-            log.debug("Topic policies cache have not init, will apply the namespace backlog quota: topicName={}",
-                    topicName);
-        } catch (Exception e) {
-            log.error("Failed to read topic policies data, "
-                            + "will apply the namespace backlog quota: topicName={}", topicName, e);
-        }
-        return getBacklogQuota(topicName.getNamespaceObject(), backlogQuotaType);
-    }
-
-    public long getBacklogQuotaLimitInSize(TopicName topicName) {
-        return getBacklogQuota(topicName, BacklogQuotaType.destination_storage).getLimitSize();
-    }
-
-    public int getBacklogQuotaLimitInTime(TopicName topicName) {
-        return getBacklogQuota(topicName, BacklogQuotaType.message_age).getLimitTime();
-    }
-
     /**
      * Handle exceeded size backlog by using policies set in the zookeeper for given topic.
      *
@@ -112,8 +76,7 @@ public class BacklogQuotaManager {
      */
     public void handleExceededBacklogQuota(PersistentTopic persistentTopic, BacklogQuotaType backlogQuotaType,
                                            boolean preciseTimeBasedBacklogQuotaCheck) {
-        TopicName topicName = TopicName.get(persistentTopic.getName());
-        BacklogQuota quota = getBacklogQuota(topicName, backlogQuotaType);
+        BacklogQuota quota = persistentTopic.getBacklogQuota(backlogQuotaType);
         log.info("Backlog quota type {} exceeded for topic [{}]. Applying [{}] policy", backlogQuotaType,
                 persistentTopic.getName(), quota.getPolicy());
         switch (quota.getPolicy()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 3c5831e..b2f9e61 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -149,9 +149,9 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
                         isEncryptionRequired = false;
                     } else {
                         Policies policies = optPolicies.get();
+                        updateTopicPolicyByNamespacePolicy(policies);
                         isEncryptionRequired = policies.encryption_required;
                         isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
-                        topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(policies.inactive_topic_policies);
                         setSchemaCompatibilityStrategy(policies);
                         schemaValidationEnforced = policies.schema_validation_enforced;
                     }
@@ -964,6 +964,9 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
             log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired,
                     data.encryption_required);
         }
+
+        updateTopicPolicyByNamespacePolicy(data);
+
         isEncryptionRequired = data.encryption_required;
         setSchemaCompatibilityStrategy(data);
         isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
@@ -975,8 +978,6 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
         });
         subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(Consumer::checkPermissions));
 
-        this.topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(data.inactive_topic_policies);
-
         return checkReplicationAndRetryOnFailure();
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 78f0fda..38661b1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -31,6 +31,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.util.concurrent.FastThreadLocal;
 import java.time.Clock;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -126,6 +127,7 @@ import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats;
 import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.LedgerInfo;
@@ -317,6 +319,9 @@ public class PersistentTopic extends AbstractTopic
                     }
 
                     Policies policies = optPolicies.get();
+
+                    this.updateTopicPolicyByNamespacePolicy(policies);
+
                     this.isEncryptionRequired = policies.encryption_required;
 
                     setSchemaCompatibilityStrategy(policies);
@@ -324,8 +329,6 @@ public class PersistentTopic extends AbstractTopic
 
                     schemaValidationEnforced = policies.schema_validation_enforced;
 
-                    topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(policies.inactive_topic_policies);
-
                     updateUnackedMessagesAppliedOnSubscription(policies);
                     updateUnackedMessagesExceededOnConsumer(policies);
                 }).exceptionally(ex -> {
@@ -2399,6 +2402,9 @@ public class PersistentTopic extends AbstractTopic
             log.debug("Ignore the update because it has been deleted : {}", data);
             return CompletableFuture.completedFuture(null);
         }
+
+        updateTopicPolicyByNamespacePolicy(data);
+
         isEncryptionRequired = data.encryption_required;
 
         setSchemaCompatibilityStrategy(data);
@@ -2407,7 +2413,6 @@ public class PersistentTopic extends AbstractTopic
         schemaValidationEnforced = data.schema_validation_enforced;
         updateUnackedMessagesAppliedOnSubscription(data);
         updateUnackedMessagesExceededOnConsumer(data);
-        this.topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(data.max_subscriptions_per_topic);
 
         if (data.delayed_delivery_policies != null) {
             delayedDeliveryTickTimeMillis = data.delayed_delivery_policies.getTickTime();
@@ -2416,8 +2421,6 @@ public class PersistentTopic extends AbstractTopic
         //If the topic-level policy already exists, the namespace-level policy cannot override the topic-level policy.
         Optional<TopicPolicies> topicPolicies = getTopicPolicies();
 
-        this.topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(data.inactive_topic_policies);
-
         initializeRateLimiterIfNeeded(Optional.ofNullable(data));
 
         this.updateMaxPublishRate(data);
@@ -2453,7 +2456,6 @@ public class PersistentTopic extends AbstractTopic
             subscribeRateLimiter.get().onPoliciesUpdate(data);
         }
 
-
         return CompletableFuture.allOf(replicationFuture, dedupFuture, persistentPoliciesFuture,
                 preCreateSubscriptionForCompactionIfNeeded());
     }
@@ -2463,9 +2465,8 @@ public class PersistentTopic extends AbstractTopic
      * @return Backlog quota for topic
      */
     @Override
-    public BacklogQuota getBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType) {
-        TopicName topicName = TopicName.get(this.getName());
-        return brokerService.getBacklogQuotaManager().getBacklogQuota(topicName, backlogQuotaType);
+    public BacklogQuota getBacklogQuota(BacklogQuotaType backlogQuotaType) {
+        return this.topicPolicies.getBackLogQuotaMap().get(backlogQuotaType).get();
     }
 
     /**
@@ -2497,8 +2498,7 @@ public class PersistentTopic extends AbstractTopic
      * @return determine if backlog quota enforcement needs to be done for topic based on size limit
      */
     public boolean isSizeBacklogExceeded() {
-        TopicName topicName = TopicName.get(getName());
-        long backlogQuotaLimitInBytes = brokerService.getBacklogQuotaManager().getBacklogQuotaLimitInSize(topicName);
+        long backlogQuotaLimitInBytes = getBacklogQuota(BacklogQuotaType.destination_storage).getLimitSize();
         if (backlogQuotaLimitInBytes < 0) {
             return false;
         }
@@ -2521,7 +2521,7 @@ public class PersistentTopic extends AbstractTopic
     public boolean isTimeBacklogExceeded() {
         TopicName topicName = TopicName.get(getName());
         CompletableFuture<Boolean> future = new CompletableFuture<>();
-        int backlogQuotaLimitInSecond = brokerService.getBacklogQuotaManager().getBacklogQuotaLimitInTime(topicName);
+        int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime();
 
         // If backlog quota by time is not set and we have no durable cursor.
         if (backlogQuotaLimitInSecond <= 0
@@ -3088,8 +3088,13 @@ public class PersistentTopic extends AbstractTopic
         }
 
         topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue(policies.getMaxSubscriptionsPerTopic());
-
         topicPolicies.getInactiveTopicPolicies().updateTopicValue(policies.getInactiveTopicPolicies());
+        Arrays.stream(BacklogQuotaType.values()).forEach(type ->
+                this.topicPolicies.getBackLogQuotaMap().get(type).updateTopicValue(
+                        policies.getBackLogQuotaMap() == null ? null :
+                                policies.getBackLogQuotaMap().get(type.toString()))
+        );
+
 
         updateUnackedMessagesAppliedOnSubscription(namespacePolicies.orElse(null));
         initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(policies));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index d4a7963..e0e6b5c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -44,7 +44,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.pulsar.broker.ConfigHelper;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.service.BacklogQuotaManager;
 import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
@@ -100,6 +99,8 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
 
     private final String topicPolicyEventsTopic = "persistent://" + myNamespace + "/__change_events";
 
+    private final int testTopicPartitions = 2;
+
     @BeforeMethod
     @Override
     protected void setup() throws Exception {
@@ -113,7 +114,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
         TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
         admin.tenants().createTenant(this.testTenant, tenantInfo);
         admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Sets.newHashSet("test"));
-        admin.topics().createPartitionedTopic(testTopic, 2);
+        admin.topics().createPartitionedTopic(testTopic, testTopicPartitions);
         Producer producer = pulsarClient.newProducer().topic(testTopic).create();
         producer.close();
         waitForZooKeeperWatchers();
@@ -141,13 +142,13 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
         Awaitility.await()
                 .untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic)
                         .get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota));
-
-        BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager();
-        BacklogQuota backlogQuotaInManager = backlogQuotaManager
-                .getBacklogQuota(TopicName.get(testTopic), BacklogQuota.BacklogQuotaType.destination_storage);
-        log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInManager, testTopic);
-        Assert.assertEquals(backlogQuota, backlogQuotaInManager);
-
+        for (int i = 0; i < testTopicPartitions; i++) {
+            String partition = TopicName.get(testTopic).getPartition(i).toString();
+            Topic topic = pulsar.getBrokerService().getTopic(partition, false).get().get();
+            BacklogQuota backlogQuotaInTopic = topic.getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage);
+            log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInTopic, testTopic);
+            Assert.assertEquals(backlogQuota, backlogQuotaInTopic);
+        }
         admin.topics().deletePartitionedTopic(testTopic, true);
     }
 
@@ -164,18 +165,28 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
         Awaitility.await()
                 .untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic)
                         .get(BacklogQuota.BacklogQuotaType.message_age), backlogQuota));
-
-        BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager();
-        BacklogQuota backlogQuotaInManager = backlogQuotaManager
-                .getBacklogQuota(TopicName.get(testTopic), BacklogQuota.BacklogQuotaType.message_age);
-
-        Assert.assertEquals(backlogQuota, backlogQuotaInManager);
+        for (int i = 0; i < testTopicPartitions; i++) {
+            String partition = TopicName.get(testTopic).getPartition(i).toString();
+            Topic topic = pulsar.getBrokerService().getTopic(partition, false).get().get();
+            BacklogQuota backlogQuotaInTopic = topic.getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age);
+            Assert.assertEquals(backlogQuota, backlogQuotaInTopic);
+        }
 
         admin.topics().deletePartitionedTopic(testTopic, true);
     }
 
     @Test
     public void testRemoveSizeBasedBacklogQuota() throws Exception {
+        List<Topic> partitions = new ArrayList<>();
+        List<BacklogQuota> defaultBacklogQuotas = new ArrayList<>();
+        for (int i = 0; i < testTopicPartitions; i++) {
+            String partition = TopicName.get(testTopic).getPartition(i).toString();
+            Topic topic = pulsar.getBrokerService().getOrCreateTopic(partition).get();
+            partitions.add(topic);
+            BacklogQuota defaultBacklogQuota = topic.getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage);
+            defaultBacklogQuotas.add(defaultBacklogQuota);
+        }
+
         BacklogQuota backlogQuota = BacklogQuota.builder()
                 .limitSize(1024)
                 .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
@@ -189,52 +200,69 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
                 .untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic)
                         .get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota));
 
-        BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager();
-        BacklogQuota backlogQuotaInManager = backlogQuotaManager
-                .getBacklogQuota(TopicName.get(testTopic), BacklogQuota.BacklogQuotaType.destination_storage);
-        log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInManager, testTopic);
-        Assert.assertEquals(backlogQuota, backlogQuotaInManager);
+        partitions.forEach(topic -> {
+            BacklogQuota backlogQuotaInTopic = topic.getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage);
+            log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInTopic, testTopic);
+            Assert.assertEquals(backlogQuota, backlogQuotaInTopic);
+        });
 
         admin.topics().removeBacklogQuota(testTopic, BacklogQuota.BacklogQuotaType.destination_storage);
         Awaitility.await()
                 .untilAsserted(() -> Assert.assertNull(admin.topics().getBacklogQuotaMap(testTopic)
                         .get(BacklogQuota.BacklogQuotaType.destination_storage)));
-
-        backlogQuotaInManager = backlogQuotaManager
-                .getBacklogQuota(TopicName.get(testTopic), BacklogQuota.BacklogQuotaType.destination_storage);
-        log.info("Backlog quota {} in backlog quota manager on topic: {} after remove", backlogQuotaInManager,
-                testTopic);
-        Assert.assertEquals(backlogQuotaManager.getDefaultQuota(), backlogQuotaInManager);
+        for (int i = 0; i < partitions.size(); i++) {
+            BacklogQuota backlogQuotaInTopic =
+                    partitions.get(i).getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage);
+            log.info("Backlog quota {} in backlog quota manager on topic: {} after remove", backlogQuotaInTopic,
+                    testTopic);
+            Assert.assertEquals(defaultBacklogQuotas.get(i), backlogQuotaInTopic);
+        }
 
         admin.topics().deletePartitionedTopic(testTopic, true);
     }
 
     @Test
     public void testRemoveTimeBasedBacklogQuota() throws Exception {
+        List<Topic> partitions = new ArrayList<>();
+        List<BacklogQuota> defaultBacklogQuotas = new ArrayList<>();
+        for (int i = 0; i < testTopicPartitions; i++) {
+            String partition = TopicName.get(testTopic).getPartition(i).toString();
+            Topic topic = pulsar.getBrokerService().getOrCreateTopic(partition).get();
+            partitions.add(topic);
+            BacklogQuota defaultBacklogQuota = topic.getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age);
+            defaultBacklogQuotas.add(defaultBacklogQuota);
+        }
+
         BacklogQuota backlogQuota = BacklogQuota.builder()
                 .limitTime(1000)
                 .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
                 .build();
-
         admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.message_age);
 
         Awaitility.await()
                 .untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic)
                         .get(BacklogQuota.BacklogQuotaType.message_age), backlogQuota));
 
-        BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager();
-        BacklogQuota backlogQuotaInManager = backlogQuotaManager
-                .getBacklogQuota(TopicName.get(testTopic), BacklogQuota.BacklogQuotaType.message_age);
-        Assert.assertEquals(backlogQuota, backlogQuotaInManager);
+        for (int i = 0; i < partitions.size(); i++) {
+            Assert.assertEquals(partitions.get(i).getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age),
+                    backlogQuota);
+            //destination_storage should keep the same.
+            Assert.assertEquals(partitions.get(i).getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage),
+                    defaultBacklogQuotas.get(i));
+        }
 
         admin.topics().removeBacklogQuota(testTopic, BacklogQuota.BacklogQuotaType.message_age);
         Awaitility.await()
                 .untilAsserted(() -> Assert.assertNull(admin.topics().getBacklogQuotaMap(testTopic)
                         .get(BacklogQuota.BacklogQuotaType.message_age)));
 
-        backlogQuotaInManager = backlogQuotaManager
-                .getBacklogQuota(TopicName.get(testTopic), BacklogQuota.BacklogQuotaType.message_age);
-        Assert.assertEquals(backlogQuotaManager.getDefaultQuota(), backlogQuotaInManager);
+        for (int i = 0; i < partitions.size(); i++) {
+            BacklogQuota backlogQuotaInTopic =
+                    partitions.get(i).getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age);
+            log.info("Backlog quota {} in backlog quota manager on topic: {} after remove", backlogQuotaInTopic,
+                    testTopic);
+            Assert.assertEquals(defaultBacklogQuotas.get(i), backlogQuotaInTopic);
+        }
 
         admin.topics().deletePartitionedTopic(testTopic, true);
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
index 866ed13..c23c20f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
@@ -18,24 +18,6 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.EventLoopGroup;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.mledger.ManagedLedger;
-import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.Topic;
-import org.apache.pulsar.common.api.proto.MessageMetadata;
-import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.testng.annotations.Test;
-
-import java.lang.reflect.Field;
-import java.util.Map;
-
 import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
@@ -50,6 +32,24 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.EventLoopGroup;
+import java.lang.reflect.Field;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.service.BacklogQuotaManager;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.testng.annotations.Test;
 
 @Slf4j
 @Test(groups = "broker")
@@ -218,6 +218,7 @@ public class MessageDuplicationTest {
         serviceConfiguration.setReplicatorPrefix(REPLICATOR_PREFIX);
 
         doReturn(serviceConfiguration).when(pulsarService).getConfiguration();
+        doReturn(mock(PulsarResources.class)).when(pulsarService).getPulsarResources();
 
         ManagedLedger managedLedger = mock(ManagedLedger.class);
         MessageDeduplication messageDeduplication = spy(new MessageDeduplication(pulsarService, mock(PersistentTopic.class), managedLedger));
@@ -236,6 +237,7 @@ public class MessageDuplicationTest {
         BrokerService brokerService = mock(BrokerService.class);
         doReturn(eventLoopGroup).when(brokerService).executor();
         doReturn(pulsarService).when(brokerService).pulsar();
+        doReturn(new BacklogQuotaManager(pulsarService)).when(brokerService).getBacklogQuotaManager();
 
         PersistentTopic persistentTopic = spy(new PersistentTopic("topic-1", brokerService, managedLedger, messageDeduplication));
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
index 197b973..47df060 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
@@ -19,7 +19,10 @@
 
 package org.apache.pulsar.common.policies.data;
 
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
 import lombok.Getter;
+import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
 
 /**
  * Topic policy hierarchy value container
@@ -28,9 +31,14 @@ import lombok.Getter;
 public class HierarchyTopicPolicies {
     final PolicyHierarchyValue<InactiveTopicPolicies> inactiveTopicPolicies;
     final PolicyHierarchyValue<Integer> maxSubscriptionsPerTopic;
+    final Map<BacklogQuotaType, PolicyHierarchyValue<BacklogQuota>> backLogQuotaMap;
 
     public HierarchyTopicPolicies() {
         inactiveTopicPolicies = new PolicyHierarchyValue<>();
         maxSubscriptionsPerTopic = new PolicyHierarchyValue<>();
+        backLogQuotaMap = new ImmutableMap.Builder<BacklogQuotaType, PolicyHierarchyValue<BacklogQuota>>()
+                .put(BacklogQuotaType.destination_storage, new PolicyHierarchyValue<>())
+                .put(BacklogQuotaType.message_age, new PolicyHierarchyValue<>())
+                .build();
     }
 }