You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2023/10/14 01:45:15 UTC

[pulsar] branch branch-3.1 updated: [fix][broker][branch-3.1] Fix inconsistent topic policy (#21255)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new a88fe1f0029 [fix][broker][branch-3.1] Fix inconsistent topic policy (#21255)
a88fe1f0029 is described below

commit a88fe1f002944db77ec5ecbd8ac61b7e0055404d
Author: Qiang Zhao <ma...@apache.org>
AuthorDate: Sat Oct 14 09:45:05 2023 +0800

    [fix][broker][branch-3.1] Fix inconsistent topic policy (#21255)
---
 .../ProxySaslAuthenticationTest.java               |  13 +-
 .../authentication/SaslAuthenticateTest.java       |  11 +-
 .../pulsar/broker/service/BrokerService.java       | 295 +++++++++++----------
 .../SystemTopicBasedTopicPoliciesService.java      | 100 +++++--
 .../broker/service/TopicPoliciesService.java       |  41 +++
 .../pulsar/broker/admin/PersistentTopicsTest.java  |   1 +
 .../pulsar/broker/admin/TopicPoliciesTest.java     |   2 +-
 .../admin/TopicPoliciesWithBrokerRestartTest.java  | 104 ++++++++
 .../apache/pulsar/broker/admin/TopicsAuthTest.java |   2 +
 .../apache/pulsar/broker/auth/AuthLogsTest.java    |   2 +
 .../pulsar/broker/auth/MockAuthentication.java     |   6 +-
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  19 ++
 .../broker/service/BrokerBookieIsolationTest.java  |   3 +-
 .../SystemTopicBasedTopicPoliciesServiceTest.java  |   6 +-
 .../service/persistent/PersistentTopicTest.java    |   3 +-
 .../AuthenticationTlsHostnameVerificationTest.java |   3 +
 .../api/AuthorizationProducerConsumerTest.java     |   5 +
 .../client/api/MutualAuthenticationTest.java       |   2 +-
 .../TokenAuthenticatedProducerConsumerTest.java    |   3 +
 ...kenOauth2AuthenticatedProducerConsumerTest.java |  14 +-
 ...eyStoreTlsProducerConsumerTestWithAuthTest.java |  22 ++
 .../impl/PatternTopicsConsumerImplAuthTest.java    |   1 +
 .../configurations/standalone_no_client_auth.conf  |   3 +-
 .../proxy/server/ProxyAuthenticationTest.java      |   2 +-
 .../proxy/server/ProxyForwardAuthDataTest.java     |   2 +-
 .../proxy/server/ProxyRolesEnforcementTest.java    |   2 +-
 .../server/ProxyWithAuthorizationNegTest.java      |   2 +
 .../apache/pulsar/sql/presto/TestPulsarAuth.java   |   4 +
 .../loadbalance/ExtensibleLoadManagerTest.java     |   1 +
 .../integration/presto/TestPulsarSQLAuth.java      |   1 +
 30 files changed, 478 insertions(+), 197 deletions(-)

diff --git a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
index 261efe680f8..f0e45aa734a 100644
--- a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
+++ b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.auth.AuthenticationSasl;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.proxy.server.ProxyConfiguration;
 import org.apache.pulsar.proxy.server.ProxyService;
 import org.slf4j.Logger;
@@ -193,15 +194,17 @@ public class ProxySaslAuthenticationTest extends ProducerConsumerBase {
 		conf.setAuthenticationProviders(providers);
 		conf.setClusterName("test");
 		conf.setSuperUserRoles(ImmutableSet.of("client/" + localHostname + "@" + kdc.getRealm()));
-
-		super.init();
-
-		lookupUrl = new URI(pulsar.getBrokerServiceUrl());
-
 		// set admin auth, to verify admin web resources
 		Map<String, String> clientSaslConfig = new HashMap<>();
 		clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
 		clientSaslConfig.put("serverType", "broker");
+		conf.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
+		conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory
+				.getMapper().getObjectMapper().writeValueAsString(clientSaslConfig));
+
+		super.init();
+
+		lookupUrl = new URI(pulsar.getBrokerServiceUrl());
 		log.info("set client jaas section name: PulsarClient");
 		admin = PulsarAdmin.builder()
 			.serviceHttpUrl(brokerUrl.toString())
diff --git a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
index 5cace2221de..230c2ad787d 100644
--- a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
+++ b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
@@ -58,6 +58,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.auth.AuthenticationSasl;
 import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.sasl.SaslConstants;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
@@ -186,7 +187,12 @@ public class SaslAuthenticateTest extends ProducerConsumerBase {
         conf.setAuthenticationProviders(providers);
         conf.setClusterName("test");
         conf.setSuperUserRoles(ImmutableSet.of("client" + "@" + kdc.getRealm()));
-
+        Map<String, String> clientSaslConfig = new HashMap<>();
+        clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
+        clientSaslConfig.put("serverType", "broker");
+        conf.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
+        conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory
+                .getMapper().getObjectMapper().writeValueAsString(clientSaslConfig));
         super.init();
 
         lookupUrl = new URI(pulsar.getWebServiceAddress());
@@ -197,9 +203,6 @@ public class SaslAuthenticateTest extends ProducerConsumerBase {
             .authentication(authSasl));
 
         // set admin auth, to verify admin web resources
-        Map<String, String> clientSaslConfig = new HashMap<>();
-        clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
-        clientSaslConfig.put("serverType", "broker");
         log.info("set client jaas section name: PulsarClient");
         admin = PulsarAdmin.builder()
             .serviceHttpUrl(brokerUrl.toString())
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 2cf141ed329..6a61e851fbc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1760,165 +1760,172 @@ public class BrokerService implements Closeable {
         });
     }
 
-    public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName topicName) {
+    public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull TopicName topicName) {
+        requireNonNull(topicName);
         NamespaceName namespace = topicName.getNamespaceObject();
         ServiceConfiguration serviceConfig = pulsar.getConfiguration();
 
         NamespaceResources nsr = pulsar.getPulsarResources().getNamespaceResources();
         LocalPoliciesResources lpr = pulsar.getPulsarResources().getLocalPolicies();
-        return nsr.getPoliciesAsync(namespace)
-                .thenCombine(lpr.getLocalPoliciesAsync(namespace), (policies, localPolicies) -> {
-                    PersistencePolicies persistencePolicies = null;
-                    RetentionPolicies retentionPolicies = null;
-                    OffloadPoliciesImpl topicLevelOffloadPolicies = null;
-
-                    if (pulsar.getConfig().isTopicLevelPoliciesEnabled()
-                            && !NamespaceService.isSystemServiceNamespace(namespace.toString())) {
-                        final TopicPolicies topicPolicies = pulsar.getTopicPoliciesService()
-                                .getTopicPoliciesIfExists(topicName);
-                        if (topicPolicies != null) {
-                            persistencePolicies = topicPolicies.getPersistence();
-                            retentionPolicies = topicPolicies.getRetentionPolicies();
-                            topicLevelOffloadPolicies = topicPolicies.getOffloadPolicies();
-                        }
-                    }
-
-                    if (persistencePolicies == null) {
-                        persistencePolicies = policies.map(p -> p.persistence).orElseGet(
-                                () -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
-                                        serviceConfig.getManagedLedgerDefaultWriteQuorum(),
-                                        serviceConfig.getManagedLedgerDefaultAckQuorum(),
-                                        serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
-                    }
+        final CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture;
+        if (pulsar.getConfig().isTopicLevelPoliciesEnabled()
+            && !NamespaceService.isSystemServiceNamespace(namespace.toString())
+            && !SystemTopicNames.isTopicPoliciesSystemTopic(topicName.toString())) {
+            topicPoliciesFuture = pulsar.getTopicPoliciesService().getTopicPoliciesAsync(topicName);
+        } else {
+            topicPoliciesFuture = CompletableFuture.completedFuture(Optional.empty());
+        }
+        return topicPoliciesFuture.thenCompose(topicPoliciesOptional -> {
+            final CompletableFuture<Optional<Policies>> nsPolicies = nsr.getPoliciesAsync(namespace);
+            final CompletableFuture<Optional<LocalPolicies>> lcPolicies = lpr.getLocalPoliciesAsync(namespace);
+            return nsPolicies.thenCombine(lcPolicies, (policies, localPolicies) -> {
+                PersistencePolicies persistencePolicies = null;
+                RetentionPolicies retentionPolicies = null;
+                OffloadPoliciesImpl topicLevelOffloadPolicies = null;
+                if (topicPoliciesOptional.isPresent()) {
+                    final TopicPolicies topicPolicies = topicPoliciesOptional.get();
+                    persistencePolicies = topicPolicies.getPersistence();
+                    retentionPolicies = topicPolicies.getRetentionPolicies();
+                    topicLevelOffloadPolicies = topicPolicies.getOffloadPolicies();
+                }
 
-                    if (retentionPolicies == null) {
-                        retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
-                                () -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
-                                        serviceConfig.getDefaultRetentionSizeInMB())
-                        );
-                    }
+                if (persistencePolicies == null) {
+                    persistencePolicies = policies.map(p -> p.persistence).orElseGet(
+                            () -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
+                                    serviceConfig.getManagedLedgerDefaultWriteQuorum(),
+                                    serviceConfig.getManagedLedgerDefaultAckQuorum(),
+                                    serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
+                }
 
-                    ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
-                    managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
-                    managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
-                    managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
+                if (retentionPolicies == null) {
+                    retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
+                            () -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
+                                    serviceConfig.getDefaultRetentionSizeInMB())
+                    );
+                }
 
-                    if (serviceConfig.isStrictBookieAffinityEnabled()) {
+                ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+                managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
+                managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
+                managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
+
+                if (serviceConfig.isStrictBookieAffinityEnabled()) {
+                    managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(
+                            IsolatedBookieEnsemblePlacementPolicy.class);
+                    if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) {
+                        Map<String, Object> properties = new HashMap<>();
+                        properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
+                                localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
+                        properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+                                localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
+                        managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+                    } else if (isSystemTopic(topicName)) {
+                        Map<String, Object> properties = new HashMap<>();
+                        properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, "*");
+                        properties.put(IsolatedBookieEnsemblePlacementPolicy
+                                .SECONDARY_ISOLATION_BOOKIE_GROUPS, "*");
+                        managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+                    } else {
+                        Map<String, Object> properties = new HashMap<>();
+                        properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, "");
+                        properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, "");
+                        managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+                    }
+                } else {
+                    if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) {
                         managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(
                                 IsolatedBookieEnsemblePlacementPolicy.class);
-                        if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) {
-                            Map<String, Object> properties = new HashMap<>();
-                            properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
-                                    localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
-                            properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
-                                    localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
-                            managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
-                        } else if (isSystemTopic(topicName)) {
-                            Map<String, Object> properties = new HashMap<>();
-                            properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, "*");
-                            properties.put(IsolatedBookieEnsemblePlacementPolicy
-                                    .SECONDARY_ISOLATION_BOOKIE_GROUPS, "*");
-                            managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
-                        } else {
-                            Map<String, Object> properties = new HashMap<>();
-                            properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, "");
-                            properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, "");
-                            managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
-                        }
-                    } else {
-                        if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) {
-                            managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(
-                                            IsolatedBookieEnsemblePlacementPolicy.class);
-                            Map<String, Object> properties = new HashMap<>();
-                            properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
-                                    localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
-                            properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
-                                    localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
-                            managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
-                        }
+                        Map<String, Object> properties = new HashMap<>();
+                        properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
+                                localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
+                        properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+                                localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
+                        managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
                     }
+                }
 
-                    managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
-                    managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
-                    managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
-
-                    managedLedgerConfig
-                            .setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
-                    managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(
-                            serviceConfig.isPersistentUnackedRangesWithMultipleEntriesEnabled());
-                    managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore(
-                            serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInMetadataStore());
-                    managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
-                    managedLedgerConfig
-                            .setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
-                                    TimeUnit.MINUTES);
-                    managedLedgerConfig
-                            .setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
-                                    TimeUnit.MINUTES);
-                    managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
-
-                    managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
-                            serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
-                    managedLedgerConfig
-                            .setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
-                    managedLedgerConfig
-                            .setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
-                    managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
-                    managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
-                            serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
-                    managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
-                    managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
-                    managedLedgerConfig
-                            .setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
-
-                    managedLedgerConfig
-                            .setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
-                    managedLedgerConfig
-                            .setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
-                    managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
-                    managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
-                    managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
-                    managedLedgerConfig.setInactiveLedgerRollOverTime(
-                            serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), TimeUnit.SECONDS);
-                    managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
-                            serviceConfig.isCacheEvictionByMarkDeletedPosition());
-                    managedLedgerConfig.setMinimumBacklogCursorsForCaching(
-                            serviceConfig.getManagedLedgerMinimumBacklogCursorsForCaching());
-                    managedLedgerConfig.setMinimumBacklogEntriesForCaching(
-                            serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching());
-                    managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching(
-                            serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching());
-
-                    OffloadPoliciesImpl nsLevelOffloadPolicies =
-                            (OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null);
-                    OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.mergeConfiguration(
-                            topicLevelOffloadPolicies,
-                            OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)),
-                            getPulsar().getConfig().getProperties());
-                    if (NamespaceService.isSystemServiceNamespace(namespace.toString())) {
-                        managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
-                    } else  {
-                        if (topicLevelOffloadPolicies != null) {
-                            try {
-                                LedgerOffloader topicLevelLedgerOffLoader =
-                                        pulsar().createManagedLedgerOffloader(offloadPolicies);
-                                managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
-                            } catch (PulsarServerException e) {
-                                throw new RuntimeException(e);
-                            }
-                        } else {
-                            //If the topic level policy is null, use the namespace level
-                            managedLedgerConfig
-                                    .setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
+                managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
+                managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
+                managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
+
+                managedLedgerConfig
+                        .setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
+                managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(
+                        serviceConfig.isPersistentUnackedRangesWithMultipleEntriesEnabled());
+                managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore(
+                        serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInMetadataStore());
+                managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
+                managedLedgerConfig
+                        .setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
+                                TimeUnit.MINUTES);
+                managedLedgerConfig
+                        .setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
+                                TimeUnit.MINUTES);
+                managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
+
+                managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
+                        serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
+                managedLedgerConfig
+                        .setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
+                managedLedgerConfig
+                        .setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
+                managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
+                managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
+                        serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
+                managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
+                managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
+                managedLedgerConfig
+                        .setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
+
+                managedLedgerConfig
+                        .setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
+                managedLedgerConfig
+                        .setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
+                managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
+                managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
+                managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
+                managedLedgerConfig.setInactiveLedgerRollOverTime(
+                        serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), TimeUnit.SECONDS);
+                managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
+                        serviceConfig.isCacheEvictionByMarkDeletedPosition());
+                managedLedgerConfig.setMinimumBacklogCursorsForCaching(
+                        serviceConfig.getManagedLedgerMinimumBacklogCursorsForCaching());
+                managedLedgerConfig.setMinimumBacklogEntriesForCaching(
+                        serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching());
+                managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching(
+                        serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching());
+
+                OffloadPoliciesImpl nsLevelOffloadPolicies =
+                        (OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null);
+                OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.mergeConfiguration(
+                        topicLevelOffloadPolicies,
+                        OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)),
+                        getPulsar().getConfig().getProperties());
+                if (NamespaceService.isSystemServiceNamespace(namespace.toString())) {
+                    managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
+                } else  {
+                    if (topicLevelOffloadPolicies != null) {
+                        try {
+                            LedgerOffloader topicLevelLedgerOffLoader =
+                                    pulsar().createManagedLedgerOffloader(offloadPolicies);
+                            managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
+                        } catch (PulsarServerException e) {
+                            throw new RuntimeException(e);
                         }
+                    } else {
+                        //If the topic level policy is null, use the namespace level
+                        managedLedgerConfig
+                                .setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
                     }
+                }
 
-                    managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
-                            serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
-                    managedLedgerConfig.setNewEntriesCheckDelayInMillis(
-                            serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
-                    return managedLedgerConfig;
-                });
+                managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
+                        serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
+                managedLedgerConfig.setNewEntriesCheckDelayInMillis(
+                        serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
+                return managedLedgerConfig;
+            });
+        });
     }
 
     private void addTopicToStatsMaps(TopicName topicName, Topic topic) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 09f8de818db..ed76d37ae25 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -18,12 +18,14 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static java.util.Objects.requireNonNull;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -54,6 +56,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,8 +81,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
 
     private final Map<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
             readerCaches = new ConcurrentHashMap<>();
-    @VisibleForTesting
-    final Map<NamespaceName, Boolean> policyCacheInitMap = new ConcurrentHashMap<>();
+
+    final Map<NamespaceName, CompletableFuture<Void>> policyCacheInitMap = new ConcurrentHashMap<>();
 
     @VisibleForTesting
     final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = new ConcurrentHashMap<>();
@@ -219,12 +222,12 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                                           boolean isGlobal) throws TopicPoliciesCacheNotInitException {
         if (!policyCacheInitMap.containsKey(topicName.getNamespaceObject())) {
             NamespaceName namespace = topicName.getNamespaceObject();
-            prepareInitPoliciesCache(namespace, new CompletableFuture<>());
+            prepareInitPoliciesCacheAsync(namespace);
         }
 
         MutablePair<TopicPoliciesCacheNotInitException, TopicPolicies> result = new MutablePair<>();
         policyCacheInitMap.compute(topicName.getNamespaceObject(), (k, initialized) -> {
-            if (initialized == null || !initialized) {
+            if (initialized == null || !initialized.isDone()) {
                 result.setLeft(new TopicPoliciesCacheNotInitException());
             } else {
                 TopicPolicies topicPolicies =
@@ -242,6 +245,34 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
         }
     }
 
+    @NotNull
+    @Override
+    public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@NotNull TopicName topicName,
+                                                                            boolean isGlobal) {
+        requireNonNull(topicName);
+        final CompletableFuture<Void> preparedFuture = prepareInitPoliciesCacheAsync(topicName.getNamespaceObject());
+        return preparedFuture.thenApply(__ -> {
+            final TopicPolicies candidatePolicies = isGlobal
+                    ? globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName()))
+                    : policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
+            return Optional.ofNullable(candidatePolicies);
+        });
+    }
+
+    @NotNull
+    @Override
+    public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@NotNull TopicName topicName) {
+        requireNonNull(topicName);
+        final CompletableFuture<Void> preparedFuture = prepareInitPoliciesCacheAsync(topicName.getNamespaceObject());
+        return preparedFuture.thenApply(__ -> {
+            final TopicPolicies localPolicies = policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
+            if (localPolicies != null) {
+                return Optional.of(localPolicies);
+            }
+            return Optional.ofNullable(globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName())));
+        });
+    }
+
     @Override
     public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
         return policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
@@ -265,39 +296,48 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
 
     @Override
     public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
-        CompletableFuture<Void> result = new CompletableFuture<>();
         NamespaceName namespace = namespaceBundle.getNamespaceObject();
         if (NamespaceService.isHeartbeatNamespace(namespace)) {
-            result.complete(null);
-            return result;
+            return CompletableFuture.completedFuture(null);
         }
         synchronized (this) {
             if (readerCaches.get(namespace) != null) {
                 ownedBundlesCountPerNamespace.get(namespace).incrementAndGet();
-                result.complete(null);
+                return CompletableFuture.completedFuture(null);
             } else {
-                prepareInitPoliciesCache(namespace, result);
+                return prepareInitPoliciesCacheAsync(namespace);
             }
         }
-        return result;
     }
 
-    private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace, CompletableFuture<Void> result) {
-        if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
-            CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
+    private @Nonnull CompletableFuture<Void> prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) {
+        requireNonNull(namespace);
+        return policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
+            final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
                     createSystemTopicClientWithRetry(namespace);
             readerCaches.put(namespace, readerCompletableFuture);
             ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
-            readerCompletableFuture.thenAccept(reader -> {
-                initPolicesCache(reader, result);
-                result.thenRun(() -> readMorePolicies(reader));
-            }).exceptionally(ex -> {
-                log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
-                cleanCacheAndCloseReader(namespace, false);
-                result.completeExceptionally(ex);
+            final CompletableFuture<Void> initFuture = readerCompletableFuture
+                    .thenCompose(reader -> {
+                        final CompletableFuture<Void> stageFuture = new CompletableFuture<>();
+                        initPolicesCache(reader, stageFuture);
+                        return stageFuture
+                                // Read policies in background
+                                .thenAccept(__ -> readMorePoliciesAsync(reader));
+                    });
+            initFuture.exceptionally(ex -> {
+                try {
+                    log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
+                    cleanCacheAndCloseReader(namespace, false);
+                } catch (Throwable cleanupEx) {
+                    // Adding this catch to avoid break callback chain
+                    log.error("[{}] Failed to cleanup reader on __change_events topic", namespace, cleanupEx);
+                }
                 return null;
             });
-        }
+            // let caller know we've got an exception.
+            return initFuture;
+        });
     }
 
     protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> createSystemTopicClientWithRetry(
@@ -381,8 +421,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Reach the end of the system topic.", reader.getSystemTopic().getTopicName());
                 }
-                policyCacheInitMap.computeIfPresent(
-                        reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true);
+
                 // replay policy message
                 policiesCache.forEach(((topicName, topicPolicies) -> {
                     if (listeners.get(topicName) != null) {
@@ -395,6 +434,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                         }
                     }
                 }));
+
                 future.complete(null);
             }
         });
@@ -420,7 +460,13 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
         });
     }
 
-    private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> reader) {
+    /**
+     * This is an async method for the background reader to continue syncing new messages.
+     *
+     * Note: You should not do any blocking call here. because it will affect
+     * #{@link SystemTopicBasedTopicPoliciesService#getTopicPoliciesAsync(TopicName)} method to block loading topic.
+     */
+    private void readMorePoliciesAsync(SystemTopicClient.Reader<PulsarEvent> reader) {
         reader.readNextAsync()
                 .thenAccept(msg -> {
                     refreshTopicPoliciesCache(msg);
@@ -428,7 +474,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                 })
                 .whenComplete((__, ex) -> {
                     if (ex == null) {
-                        readMorePolicies(reader);
+                        readMorePoliciesAsync(reader);
                     } else {
                         Throwable cause = FutureUtil.unwrapCompletionException(ex);
                         if (cause instanceof PulsarClientException.AlreadyClosedException) {
@@ -437,7 +483,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                                     reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
                         } else {
                             log.warn("Read more topic polices exception, read again.", ex);
-                            readMorePolicies(reader);
+                            readMorePoliciesAsync(reader);
                         }
                     }
                 });
@@ -605,7 +651,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
     }
 
     @VisibleForTesting
-    public Boolean getPoliciesCacheInit(NamespaceName namespaceName) {
+    public CompletableFuture<Void> getPoliciesCacheInit(NamespaceName namespaceName) {
         return policyCacheInitMap.get(namespaceName);
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index c4bcc0c3935..aa3a6aaeff2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -22,6 +22,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.BackoffBuilder;
@@ -31,6 +32,7 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Topic policies service.
@@ -109,6 +111,32 @@ public interface TopicPoliciesService {
         return response;
     }
 
+    /**
+     * Asynchronously retrieves topic policies.
+     * This triggers the Pulsar broker's internal client to load policies from the
+     * system topic `persistent://tenant/namespace/__change_event`.
+     *
+     * @param topicName The name of the topic.
+     * @param isGlobal Indicates if the policies are global.
+     * @return A CompletableFuture containing an Optional of TopicPolicies.
+     * @throws NullPointerException If the topicName is null.
+     */
+    @Nonnull
+    CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull TopicName topicName, boolean isGlobal);
+
+    /**
+     * Asynchronously retrieves topic policies.
+     * This triggers the Pulsar broker's internal client to load policies from the
+     * system topic `persistent://tenant/namespace/__change_event`.
+     *
+     * NOTE: If local policies are not available, it will fallback to using topic global policies.
+     * @param topicName The name of the topic.
+     * @return A CompletableFuture containing an Optional of TopicPolicies.
+     * @throws NullPointerException If the topicName is null.
+     */
+    @Nonnull
+    CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull TopicName topicName);
+
     /**
      * Get policies for a topic without cache async.
      * @param topicName topic name
@@ -162,6 +190,19 @@ public interface TopicPoliciesService {
             return null;
         }
 
+        @NotNull
+        @Override
+        public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@NotNull TopicName topicName,
+                                                                                boolean isGlobal) {
+            return CompletableFuture.completedFuture(Optional.empty());
+        }
+
+        @NotNull
+        @Override
+        public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@NotNull TopicName topicName) {
+            return CompletableFuture.completedFuture(Optional.empty());
+        }
+
         @Override
         public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
             return null;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index a4f6bd4650f..25ad6cab942 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -127,6 +127,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
     @Override
     @BeforeMethod
     protected void setup() throws Exception {
+        conf.setTopicLevelPoliciesEnabled(false);
         super.internalSetup();
         persistentTopics = spy(PersistentTopics.class);
         persistentTopics.setServletContext(new MockServletContext());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 87471f4972f..faf141a5d1c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -180,7 +180,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
         assertFalse(pulsar.getBrokerService().getTopics().containsKey(topic));
         //make sure namespace policy reader is fully started.
         Awaitility.await().untilAsserted(()-> {
-            assertTrue(policyService.getPoliciesCacheInit(topicName.getNamespaceObject()));
+            assertTrue(policyService.getPoliciesCacheInit(topicName.getNamespaceObject()).isDone());
         });
 
         //load the topic.
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesWithBrokerRestartTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesWithBrokerRestartTest.java
new file mode 100644
index 00000000000..672fc2c95f8
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesWithBrokerRestartTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+@Test(groups = "broker-admin")
+public class TopicPoliciesWithBrokerRestartTest extends MockedPulsarServiceBaseTest {
+
+    @Override
+    @BeforeClass(alwaysRun = true)
+    protected void setup() throws Exception {
+        super.internalSetup();
+        setupDefaultTenantAndNamespace();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+
+    @Test
+    public void testRetentionWithBrokerRestart() throws Exception {
+        final int messages = 1_000;
+        final int topicNum = 500;
+        // (1) Init topic
+        admin.namespaces().createNamespace("public/retention");
+        final String topicName = "persistent://public/retention/retention_with_broker_restart";
+        admin.topics().createNonPartitionedTopic(topicName);
+        for (int i = 0; i < topicNum; i++) {
+            final String shadowTopicNames = topicName + "_" + i;
+            admin.topics().createNonPartitionedTopic(shadowTopicNames);
+        }
+        // (2) Set retention
+        final RetentionPolicies retentionPolicies = new RetentionPolicies(20, 20);
+        for (int i = 0; i < topicNum; i++) {
+            final String shadowTopicNames = topicName + "_" + i;
+            admin.topicPolicies().setRetention(shadowTopicNames, retentionPolicies);
+        }
+        admin.topicPolicies().setRetention(topicName, retentionPolicies);
+        // (3) Send messages
+        @Cleanup
+        final Producer<byte[]> publisher = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+        for (int i = 0; i < messages; i++) {
+            publisher.send((i + "").getBytes(StandardCharsets.UTF_8));
+        }
+        // (4) Check configuration
+        Awaitility.await().untilAsserted(() -> {
+            final PersistentTopic persistentTopic1 = (PersistentTopic)
+                    pulsar.getBrokerService().getTopic(topicName, true).join().get();
+            final ManagedLedgerImpl managedLedger1 = (ManagedLedgerImpl) persistentTopic1.getManagedLedger();
+            Assert.assertEquals(managedLedger1.getConfig().getRetentionSizeInMB(), 20);
+            Assert.assertEquals(managedLedger1.getConfig().getRetentionTimeMillis(),
+                    TimeUnit.MINUTES.toMillis(20));
+        });
+        // (5) Restart broker
+        restartBroker();
+        // (6) Check configuration again
+        for (int i = 0; i < topicNum; i++) {
+            final String shadowTopicNames = topicName + "_" + i;
+            admin.lookups().lookupTopic(shadowTopicNames);
+            final PersistentTopic persistentTopicTmp = (PersistentTopic)
+                    pulsar.getBrokerService().getTopic(shadowTopicNames, true).join().get();
+            final ManagedLedgerImpl managedLedgerTemp = (ManagedLedgerImpl) persistentTopicTmp.getManagedLedger();
+            Assert.assertEquals(managedLedgerTemp.getConfig().getRetentionSizeInMB(), 20);
+            Assert.assertEquals(managedLedgerTemp.getConfig().getRetentionTimeMillis(),
+                    TimeUnit.MINUTES.toMillis(20));
+        }
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java
index efd8b66d754..234af7afa8d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java
@@ -84,6 +84,8 @@ public class TopicsAuthTest extends MockedPulsarServiceBaseTest {
         Set<String> providers = new HashSet<>();
         providers.add(AuthenticationProviderToken.class.getName());
         conf.setAuthenticationProviders(providers);
+        conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN);
         super.internalSetup();
         PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
                 ? brokerUrl.toString() : brokerUrlTls.toString())
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java
index 6ffcecbeb9f..942a42fa7aa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java
@@ -60,6 +60,8 @@ public class AuthLogsTest extends MockedPulsarServiceBaseTest {
         conf.setAuthorizationEnabled(true);
         conf.setAuthorizationAllowWildcardsMatching(true);
         conf.setSuperUserRoles(Sets.newHashSet("super"));
+        conf.setBrokerClientAuthenticationPlugin(MockAuthentication.class.getName());
+        conf.setBrokerClientAuthenticationParameters("user:pass.pass");
         internalSetup();
 
         try (PulsarAdmin admin = PulsarAdmin.builder()
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java
index 0b1726617f7..25ac59796b0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java
@@ -29,7 +29,10 @@ import org.slf4j.LoggerFactory;
 
 public class MockAuthentication implements Authentication {
     private static final Logger log = LoggerFactory.getLogger(MockAuthentication.class);
-    private final String user;
+    private String user;
+
+    public MockAuthentication() {
+    }
 
     public MockAuthentication(String user) {
         this.user = user;
@@ -67,6 +70,7 @@ public class MockAuthentication implements Authentication {
 
     @Override
     public void configure(Map<String, String> authParams) {
+        this.user = authParams.get("user");
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index c32d3fc3b0b..53f6af82bee 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -39,6 +39,7 @@ import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.TimeoutHandler;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
@@ -48,6 +49,8 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -231,6 +234,22 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
 
     protected final void init() throws Exception {
         doInitConf();
+        // trying to config the broker internal client
+        if (conf.getWebServicePortTls().isPresent()
+            && conf.getAuthenticationProviders().contains(AuthenticationProviderTls.class.getName())
+            && !conf.isTlsEnabledWithKeyStore()) {
+            // enabled TLS
+            if (conf.getBrokerClientAuthenticationPlugin() == null
+                || conf.getBrokerClientAuthenticationPlugin().equals(AuthenticationDisabled.class.getName())) {
+                conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
+                conf.setBrokerClientAuthenticationParameters("tlsCertFile:" + BROKER_CERT_FILE_PATH
+                                                             + ",tlsKeyFile:" + BROKER_KEY_FILE_PATH);
+                conf.setBrokerClientTlsEnabled(true);
+                conf.setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH);
+                conf.setBrokerClientCertificateFilePath(BROKER_CERT_FILE_PATH);
+                conf.setBrokerClientKeyFilePath(BROKER_KEY_FILE_PATH);
+            }
+        }
         startBroker();
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
index 951892f4ebf..5252407892e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
@@ -304,6 +304,7 @@ public class BrokerBookieIsolationTest {
                 bookies[3].getBookieId());
 
         ServiceConfiguration config = new ServiceConfiguration();
+        config.setTopicLevelPoliciesEnabled(false);
         config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
         config.setClusterName(cluster);
         config.setWebServicePort(Optional.of(0));
@@ -612,9 +613,9 @@ public class BrokerBookieIsolationTest {
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
+        config.setTopicLevelPoliciesEnabled(false);
         config.setAdvertisedAddress("localhost");
         config.setBookkeeperClientIsolationGroups(brokerBookkeeperClientIsolationGroups);
-
         config.setManagedLedgerDefaultEnsembleSize(2);
         config.setManagedLedgerDefaultWriteQuorum(2);
         config.setManagedLedgerDefaultAckQuorum(2);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 31b5bcb23cd..5b70ff99675 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -141,7 +141,7 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
         // Wait for all topic policies updated.
         Awaitility.await().untilAsserted(() ->
                 Assert.assertTrue(systemTopicBasedTopicPoliciesService
-                        .getPoliciesCacheInit(TOPIC1.getNamespaceObject())));
+                        .getPoliciesCacheInit(TOPIC1.getNamespaceObject()).isDone()));
 
         // Assert broker is cache all topic policies
         Awaitility.await().untilAsserted(() ->
@@ -304,8 +304,8 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
     @Test
     public void testGetPolicyTimeout() throws Exception {
         SystemTopicBasedTopicPoliciesService service = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
-        Awaitility.await().untilAsserted(() -> assertTrue(service.policyCacheInitMap.get(TOPIC1.getNamespaceObject())));
-        service.policyCacheInitMap.put(TOPIC1.getNamespaceObject(), false);
+        Awaitility.await().untilAsserted(() -> assertTrue(service.policyCacheInitMap.get(TOPIC1.getNamespaceObject()).isDone()));
+        service.policyCacheInitMap.put(TOPIC1.getNamespaceObject(), new CompletableFuture<>());
         long start = System.currentTimeMillis();
         Backoff backoff = new BackoffBuilder()
                 .setInitialTime(500, TimeUnit.MILLISECONDS)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index e29d015c45d..9995b6a28a9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -45,6 +45,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -620,7 +621,7 @@ public class PersistentTopicTest extends BrokerTestBase {
         doReturn(policiesService).when(pulsar).getTopicPoliciesService();
         TopicPolicies policies = new TopicPolicies();
         policies.setRetentionPolicies(retentionPolicies);
-        doReturn(policies).when(policiesService).getTopicPoliciesIfExists(TopicName.get(topic));
+        doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(policiesService).getTopicPoliciesAsync(TopicName.get(topic));
         persistentTopic.onUpdate(policies);
         verify(persistentTopic, times(1)).checkPersistencePolicies();
         Awaitility.await().untilAsserted(() -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
index e3bd321d763..042c9b328d5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.tls.PublicSuffixMatcher;
 import org.apache.pulsar.common.tls.TlsHostnameVerifier;
+import org.assertj.core.util.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -141,6 +142,7 @@ public class AuthenticationTlsHostnameVerificationTest extends ProducerConsumerB
         // setup broker cert which has CN = "pulsar" different than broker's hostname="localhost"
         conf.setBrokerServicePortTls(Optional.of(0));
         conf.setWebServicePortTls(Optional.of(0));
+        conf.setAuthenticationProviders(Sets.newTreeSet(AuthenticationProviderTls.class.getName()));
         conf.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH);
         conf.setTlsCertificateFilePath(TLS_MIM_SERVER_CERT_FILE_PATH);
         conf.setTlsKeyFilePath(TLS_MIM_SERVER_KEY_FILE_PATH);
@@ -182,6 +184,7 @@ public class AuthenticationTlsHostnameVerificationTest extends ProducerConsumerB
         // setup broker cert which has CN = "localhost"
         conf.setBrokerServicePortTls(Optional.of(0));
         conf.setWebServicePortTls(Optional.of(0));
+        conf.setAuthenticationProviders(Sets.newTreeSet(AuthenticationProviderTls.class.getName()));
         conf.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH);
         conf.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH);
         conf.setTlsKeyFilePath(BROKER_KEY_FILE_PATH);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index bdbe8efc8e6..9a36e0683b4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -119,6 +119,7 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
     public void testProducerAndConsumerAuthorization() throws Exception {
         log.info("-- Starting {} test --", methodName);
         cleanup();
+        conf.setTopicLevelPoliciesEnabled(false);
         conf.setAuthorizationProvider(TestAuthorizationProvider.class.getName());
         setup();
 
@@ -179,6 +180,7 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
     public void testSubscriberPermission() throws Exception {
         log.info("-- Starting {} test --", methodName);
         cleanup();
+        conf.setTopicLevelPoliciesEnabled(false);
         conf.setEnablePackagesManagement(true);
         conf.setPackagesManagementStorageProvider(MockedPackagesStorageProvider.class.getName());
         conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
@@ -369,6 +371,7 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
     public void testClearBacklogPermission() throws Exception {
         log.info("-- Starting {} test --", methodName);
         cleanup();
+        conf.setTopicLevelPoliciesEnabled(false);
         conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
         setup();
 
@@ -610,6 +613,7 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
     public void testSubscriptionPrefixAuthorization() throws Exception {
         log.info("-- Starting {} test --", methodName);
         cleanup();
+        conf.setTopicLevelPoliciesEnabled(false);
         conf.setAuthorizationProvider(TestAuthorizationProviderWithSubscriptionPrefix.class.getName());
         setup();
 
@@ -749,6 +753,7 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
     public void testPermissionForProducerCreateInitialSubscription() throws Exception {
         log.info("-- Starting {} test --", methodName);
         cleanup();
+        conf.setTopicLevelPoliciesEnabled(false);
         conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
         setup();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
index 2fc8aebf64a..81d65b19204 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
@@ -195,7 +195,7 @@ public class MutualAuthenticationTest extends ProducerConsumerBase {
         Set<String> superUserRoles = new HashSet<>();
         superUserRoles.add("admin");
         conf.setSuperUserRoles(superUserRoles);
-
+        conf.setTopicLevelPoliciesEnabled(false);
         conf.setAuthorizationEnabled(true);
         conf.setAuthenticationEnabled(true);
         Set<String> providersClassNames = Sets.newHashSet(MutualAuthenticationProvider.class.getName());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
index 87f12e6acdc..4d5e7deaf7d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
@@ -37,6 +37,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.slf4j.Logger;
@@ -92,6 +93,8 @@ public class TokenAuthenticatedProducerConsumerTest extends ProducerConsumerBase
         Set<String> providers = new HashSet<>();
         providers.add(AuthenticationProviderToken.class.getName());
         conf.setAuthenticationProviders(providers);
+        conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN);
 
         conf.setClusterName("test");
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
index fdf41c4a6ad..ba43ee6d6a2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
@@ -27,7 +27,9 @@ import java.net.URL;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.time.Duration;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -39,6 +41,7 @@ import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
 import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -87,11 +90,12 @@ public class TokenOauth2AuthenticatedProducerConsumerTest extends ProducerConsum
         conf.setAuthenticationProviders(providers);
 
         conf.setBrokerClientAuthenticationPlugin(AuthenticationOAuth2.class.getName());
-        conf.setBrokerClientAuthenticationParameters("{\n"
-                + "  \"privateKey\": \"" + CREDENTIALS_FILE + "\",\n"
-                + "  \"issuerUrl\": \"" + server.getIssuer() + "\",\n"
-                + "  \"audience\": \"" + audience + "\",\n"
-                + "}\n");
+        final Map<String, String> oauth2Param = new HashMap<>();
+        oauth2Param.put("privateKey", CREDENTIALS_FILE);
+        oauth2Param.put("issuerUrl", server.getIssuer());
+        oauth2Param.put("audience", audience);
+        conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory
+                .getMapper().getObjectMapper().writeValueAsString(oauth2Param));
 
         conf.setClusterName("test");
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuthTest.java
index 8e508b6cf20..77405e14201 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuthTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuthTest.java
@@ -32,6 +32,7 @@ import java.util.function.Supplier;
 
 import io.jsonwebtoken.SignatureAlgorithm;
 import lombok.Cleanup;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
@@ -49,6 +50,7 @@ import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -83,6 +85,7 @@ public class KeyStoreTlsProducerConsumerTestWithAuthTest extends ProducerConsume
         super.internalCleanup();
     }
 
+    @SneakyThrows
     protected void internalSetUpForBroker() {
         conf.setBrokerServicePortTls(Optional.of(0));
         conf.setWebServicePortTls(Optional.of(0));
@@ -114,6 +117,25 @@ public class KeyStoreTlsProducerConsumerTestWithAuthTest extends ProducerConsume
 
         conf.setAuthenticationProviders(providers);
         conf.setNumExecutorThreadPoolSize(5);
+        Set<String> tlsProtocols = Sets.newConcurrentHashSet();
+        tlsProtocols.add("TLSv1.3");
+        tlsProtocols.add("TLSv1.2");
+        conf.setBrokerClientAuthenticationPlugin(AuthenticationKeyStoreTls.class.getName());
+        Map<String, String> authParams = new HashMap<>();
+        authParams.put(AuthenticationKeyStoreTls.KEYSTORE_TYPE, KEYSTORE_TYPE);
+        authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PATH, CLIENT_KEYSTORE_FILE_PATH);
+        authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PW, CLIENT_KEYSTORE_PW);
+        conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory.getMapper()
+                .getObjectMapper().writeValueAsString(authParams));
+        conf.setBrokerClientTlsEnabled(true);
+        conf.setBrokerClientTlsEnabledWithKeyStore(true);
+        conf.setBrokerClientTlsTrustStore(BROKER_TRUSTSTORE_FILE_PATH);
+        conf.setBrokerClientTlsTrustStorePassword(BROKER_TRUSTSTORE_PW);
+        conf.setBrokerClientTlsKeyStore(CLIENT_KEYSTORE_FILE_PATH);
+        conf.setBrokerClientTlsKeyStoreType(KEYSTORE_TYPE);
+        conf.setBrokerClientTlsKeyStorePassword(CLIENT_KEYSTORE_PW);
+        conf.setBrokerClientTlsProtocols(tlsProtocols);
+
     }
 
     protected void internalSetUpForClient(boolean addCertificates, String lookupUrl) throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
index 76936334eb0..b9139dabdf0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
@@ -85,6 +85,7 @@ public class PatternTopicsConsumerImplAuthTest extends ProducerConsumerBase {
         // set isTcpLookup = true, to use BinaryProtoLookupService to get topics for a pattern.
         isTcpLookup = true;
 
+        conf.setTopicLevelPoliciesEnabled(false);
         conf.setAuthenticationEnabled(true);
         conf.setAuthorizationEnabled(true);
 
diff --git a/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf b/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf
index d9411e655ad..4e2fd402983 100644
--- a/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf
+++ b/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf
@@ -29,4 +29,5 @@ authenticationEnabled=true
 authenticationProviders=org.apache.pulsar.MockTokenAuthenticationProvider
 brokerClientAuthenticationPlugin=
 brokerClientAuthenticationParameters=
-loadBalancerOverrideBrokerNicSpeedGbps=2
\ No newline at end of file
+loadBalancerOverrideBrokerNicSpeedGbps=2
+topicLevelPoliciesEnabled=false
\ No newline at end of file
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
index 8229d929ee5..9c8e5197adf 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
@@ -168,7 +168,7 @@ public class ProxyAuthenticationTest extends ProducerConsumerBase {
 		conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
 		// Expires after an hour
 		conf.setBrokerClientAuthenticationParameters(
-				"entityType:broker,expiryTime:" + (System.currentTimeMillis() + 3600 * 1000));
+				"entityType:admin,expiryTime:" + (System.currentTimeMillis() + 3600 * 1000));
 
 		Set<String> superUserRoles = new HashSet<>();
 		superUserRoles.add("admin");
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
index 99af3b1cf6a..b7cfb874747 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
@@ -53,7 +53,7 @@ public class ProxyForwardAuthDataTest extends ProducerConsumerBase {
         conf.setAuthenticationEnabled(true);
         conf.setAuthorizationEnabled(true);
         conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
-        conf.setBrokerClientAuthenticationParameters("authParam:broker");
+        conf.setBrokerClientAuthenticationParameters("authParam:admin");
         conf.setAuthenticateOriginalAuthData(true);
 
         Set<String> superUserRoles = new HashSet<String>();
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
index 2c8c382b6a5..3259cfd95c7 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
@@ -144,7 +144,7 @@ public class ProxyRolesEnforcementTest extends ProducerConsumerBase {
         conf.setAuthenticationEnabled(true);
         conf.setAuthorizationEnabled(true);
         conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
-        conf.setBrokerClientAuthenticationParameters("authParam:broker");
+        conf.setBrokerClientAuthenticationParameters("authParam:admin");
 
         Set<String> superUserRoles = new HashSet<>();
         superUserRoles.add("admin");
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
index e8bb128c8c1..2d97a4b06a8 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
@@ -78,6 +78,8 @@ public class ProxyWithAuthorizationNegTest extends ProducerConsumerBase {
     protected void setup() throws Exception {
 
         // enable tls and auth&auth at broker
+        conf.setTopicLevelPoliciesEnabled(false);
+
         conf.setAuthenticationEnabled(true);
         conf.setAuthorizationEnabled(true);
 
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
index 9119ffed4e2..7b550b7270f 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -63,6 +64,9 @@ public class TestPulsarAuth extends MockedPulsarServiceBaseTest {
         conf.setProperties(properties);
         conf.setSuperUserRoles(Sets.newHashSet(SUPER_USER_ROLE));
         conf.setClusterName("c1");
+        conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        conf.setBrokerClientAuthenticationParameters("token:" + AuthTokenUtils
+                .createToken(secretKey, SUPER_USER_ROLE, Optional.empty()));
         internalSetup();
 
         admin.clusters().createCluster("c1", ClusterData.builder().build());
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
index 057039edc3b..49e5ae37834 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
@@ -89,6 +89,7 @@ public class ExtensibleLoadManagerTest extends TestRetrySupport {
                 "org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder");
         brokerEnvs.put("forceDeleteNamespaceAllowed", "true");
         brokerEnvs.put("loadBalancerDebugModeEnabled", "true");
+        brokerEnvs.put("topicLevelPoliciesEnabled", "false");
         brokerEnvs.put("PULSAR_MEM", "-Xmx512M");
         spec.brokerEnvs(brokerEnvs);
         pulsarCluster = PulsarCluster.forSpec(spec);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
index 0a9bb5e1959..87db46f2bb6 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
@@ -68,6 +68,7 @@ public class TestPulsarSQLAuth extends TestPulsarSQLBase {
         envMap.put("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(secretKey));
         envMap.put("superUserRoles", "admin");
         envMap.put("brokerDeleteInactiveTopicsEnabled", "false");
+        envMap.put("topicLevelPoliciesEnabled", "false");
 
         for (BrokerContainer brokerContainer : pulsarCluster.getBrokers()) {
             brokerContainer.withEnv(envMap);