You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2023/10/14 01:45:15 UTC
[pulsar] branch branch-3.1 updated: [fix][broker][branch-3.1] Fix inconsistent topic policy (#21255)
This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new a88fe1f0029 [fix][broker][branch-3.1] Fix inconsistent topic policy (#21255)
a88fe1f0029 is described below
commit a88fe1f002944db77ec5ecbd8ac61b7e0055404d
Author: Qiang Zhao <ma...@apache.org>
AuthorDate: Sat Oct 14 09:45:05 2023 +0800
[fix][broker][branch-3.1] Fix inconsistent topic policy (#21255)
---
.../ProxySaslAuthenticationTest.java | 13 +-
.../authentication/SaslAuthenticateTest.java | 11 +-
.../pulsar/broker/service/BrokerService.java | 295 +++++++++++----------
.../SystemTopicBasedTopicPoliciesService.java | 100 +++++--
.../broker/service/TopicPoliciesService.java | 41 +++
.../pulsar/broker/admin/PersistentTopicsTest.java | 1 +
.../pulsar/broker/admin/TopicPoliciesTest.java | 2 +-
.../admin/TopicPoliciesWithBrokerRestartTest.java | 104 ++++++++
.../apache/pulsar/broker/admin/TopicsAuthTest.java | 2 +
.../apache/pulsar/broker/auth/AuthLogsTest.java | 2 +
.../pulsar/broker/auth/MockAuthentication.java | 6 +-
.../broker/auth/MockedPulsarServiceBaseTest.java | 19 ++
.../broker/service/BrokerBookieIsolationTest.java | 3 +-
.../SystemTopicBasedTopicPoliciesServiceTest.java | 6 +-
.../service/persistent/PersistentTopicTest.java | 3 +-
.../AuthenticationTlsHostnameVerificationTest.java | 3 +
.../api/AuthorizationProducerConsumerTest.java | 5 +
.../client/api/MutualAuthenticationTest.java | 2 +-
.../TokenAuthenticatedProducerConsumerTest.java | 3 +
...kenOauth2AuthenticatedProducerConsumerTest.java | 14 +-
...eyStoreTlsProducerConsumerTestWithAuthTest.java | 22 ++
.../impl/PatternTopicsConsumerImplAuthTest.java | 1 +
.../configurations/standalone_no_client_auth.conf | 3 +-
.../proxy/server/ProxyAuthenticationTest.java | 2 +-
.../proxy/server/ProxyForwardAuthDataTest.java | 2 +-
.../proxy/server/ProxyRolesEnforcementTest.java | 2 +-
.../server/ProxyWithAuthorizationNegTest.java | 2 +
.../apache/pulsar/sql/presto/TestPulsarAuth.java | 4 +
.../loadbalance/ExtensibleLoadManagerTest.java | 1 +
.../integration/presto/TestPulsarSQLAuth.java | 1 +
30 files changed, 478 insertions(+), 197 deletions(-)
diff --git a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
index 261efe680f8..f0e45aa734a 100644
--- a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
+++ b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationSasl;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.ProxyService;
import org.slf4j.Logger;
@@ -193,15 +194,17 @@ public class ProxySaslAuthenticationTest extends ProducerConsumerBase {
conf.setAuthenticationProviders(providers);
conf.setClusterName("test");
conf.setSuperUserRoles(ImmutableSet.of("client/" + localHostname + "@" + kdc.getRealm()));
-
- super.init();
-
- lookupUrl = new URI(pulsar.getBrokerServiceUrl());
-
// set admin auth, to verify admin web resources
Map<String, String> clientSaslConfig = new HashMap<>();
clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
clientSaslConfig.put("serverType", "broker");
+ conf.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
+ conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory
+ .getMapper().getObjectMapper().writeValueAsString(clientSaslConfig));
+
+ super.init();
+
+ lookupUrl = new URI(pulsar.getBrokerServiceUrl());
log.info("set client jaas section name: PulsarClient");
admin = PulsarAdmin.builder()
.serviceHttpUrl(brokerUrl.toString())
diff --git a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
index 5cace2221de..230c2ad787d 100644
--- a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
+++ b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
@@ -58,6 +58,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.auth.AuthenticationSasl;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.sasl.SaslConstants;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
@@ -186,7 +187,12 @@ public class SaslAuthenticateTest extends ProducerConsumerBase {
conf.setAuthenticationProviders(providers);
conf.setClusterName("test");
conf.setSuperUserRoles(ImmutableSet.of("client" + "@" + kdc.getRealm()));
-
+ Map<String, String> clientSaslConfig = new HashMap<>();
+ clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
+ clientSaslConfig.put("serverType", "broker");
+ conf.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
+ conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory
+ .getMapper().getObjectMapper().writeValueAsString(clientSaslConfig));
super.init();
lookupUrl = new URI(pulsar.getWebServiceAddress());
@@ -197,9 +203,6 @@ public class SaslAuthenticateTest extends ProducerConsumerBase {
.authentication(authSasl));
// set admin auth, to verify admin web resources
- Map<String, String> clientSaslConfig = new HashMap<>();
- clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
- clientSaslConfig.put("serverType", "broker");
log.info("set client jaas section name: PulsarClient");
admin = PulsarAdmin.builder()
.serviceHttpUrl(brokerUrl.toString())
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 2cf141ed329..6a61e851fbc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1760,165 +1760,172 @@ public class BrokerService implements Closeable {
});
}
- public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName topicName) {
+ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull TopicName topicName) {
+ requireNonNull(topicName);
NamespaceName namespace = topicName.getNamespaceObject();
ServiceConfiguration serviceConfig = pulsar.getConfiguration();
NamespaceResources nsr = pulsar.getPulsarResources().getNamespaceResources();
LocalPoliciesResources lpr = pulsar.getPulsarResources().getLocalPolicies();
- return nsr.getPoliciesAsync(namespace)
- .thenCombine(lpr.getLocalPoliciesAsync(namespace), (policies, localPolicies) -> {
- PersistencePolicies persistencePolicies = null;
- RetentionPolicies retentionPolicies = null;
- OffloadPoliciesImpl topicLevelOffloadPolicies = null;
-
- if (pulsar.getConfig().isTopicLevelPoliciesEnabled()
- && !NamespaceService.isSystemServiceNamespace(namespace.toString())) {
- final TopicPolicies topicPolicies = pulsar.getTopicPoliciesService()
- .getTopicPoliciesIfExists(topicName);
- if (topicPolicies != null) {
- persistencePolicies = topicPolicies.getPersistence();
- retentionPolicies = topicPolicies.getRetentionPolicies();
- topicLevelOffloadPolicies = topicPolicies.getOffloadPolicies();
- }
- }
-
- if (persistencePolicies == null) {
- persistencePolicies = policies.map(p -> p.persistence).orElseGet(
- () -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
- serviceConfig.getManagedLedgerDefaultWriteQuorum(),
- serviceConfig.getManagedLedgerDefaultAckQuorum(),
- serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
- }
+ final CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture;
+ if (pulsar.getConfig().isTopicLevelPoliciesEnabled()
+ && !NamespaceService.isSystemServiceNamespace(namespace.toString())
+ && !SystemTopicNames.isTopicPoliciesSystemTopic(topicName.toString())) {
+ topicPoliciesFuture = pulsar.getTopicPoliciesService().getTopicPoliciesAsync(topicName);
+ } else {
+ topicPoliciesFuture = CompletableFuture.completedFuture(Optional.empty());
+ }
+ return topicPoliciesFuture.thenCompose(topicPoliciesOptional -> {
+ final CompletableFuture<Optional<Policies>> nsPolicies = nsr.getPoliciesAsync(namespace);
+ final CompletableFuture<Optional<LocalPolicies>> lcPolicies = lpr.getLocalPoliciesAsync(namespace);
+ return nsPolicies.thenCombine(lcPolicies, (policies, localPolicies) -> {
+ PersistencePolicies persistencePolicies = null;
+ RetentionPolicies retentionPolicies = null;
+ OffloadPoliciesImpl topicLevelOffloadPolicies = null;
+ if (topicPoliciesOptional.isPresent()) {
+ final TopicPolicies topicPolicies = topicPoliciesOptional.get();
+ persistencePolicies = topicPolicies.getPersistence();
+ retentionPolicies = topicPolicies.getRetentionPolicies();
+ topicLevelOffloadPolicies = topicPolicies.getOffloadPolicies();
+ }
- if (retentionPolicies == null) {
- retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
- () -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
- serviceConfig.getDefaultRetentionSizeInMB())
- );
- }
+ if (persistencePolicies == null) {
+ persistencePolicies = policies.map(p -> p.persistence).orElseGet(
+ () -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
+ serviceConfig.getManagedLedgerDefaultWriteQuorum(),
+ serviceConfig.getManagedLedgerDefaultAckQuorum(),
+ serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
+ }
- ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
- managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
- managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
- managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
+ if (retentionPolicies == null) {
+ retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
+ () -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
+ serviceConfig.getDefaultRetentionSizeInMB())
+ );
+ }
- if (serviceConfig.isStrictBookieAffinityEnabled()) {
+ ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+ managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
+ managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
+ managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
+
+ if (serviceConfig.isStrictBookieAffinityEnabled()) {
+ managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(
+ IsolatedBookieEnsemblePlacementPolicy.class);
+ if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
+ localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
+ properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+ localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
+ managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+ } else if (isSystemTopic(topicName)) {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, "*");
+ properties.put(IsolatedBookieEnsemblePlacementPolicy
+ .SECONDARY_ISOLATION_BOOKIE_GROUPS, "*");
+ managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+ } else {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, "");
+ properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, "");
+ managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+ }
+ } else {
+ if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) {
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(
IsolatedBookieEnsemblePlacementPolicy.class);
- if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) {
- Map<String, Object> properties = new HashMap<>();
- properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
- localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
- properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
- localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
- managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
- } else if (isSystemTopic(topicName)) {
- Map<String, Object> properties = new HashMap<>();
- properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, "*");
- properties.put(IsolatedBookieEnsemblePlacementPolicy
- .SECONDARY_ISOLATION_BOOKIE_GROUPS, "*");
- managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
- } else {
- Map<String, Object> properties = new HashMap<>();
- properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, "");
- properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, "");
- managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
- }
- } else {
- if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) {
- managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(
- IsolatedBookieEnsemblePlacementPolicy.class);
- Map<String, Object> properties = new HashMap<>();
- properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
- localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
- properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
- localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
- managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
- }
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
+ localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
+ properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+ localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
+ managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
}
+ }
- managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
- managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
- managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
-
- managedLedgerConfig
- .setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
- managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(
- serviceConfig.isPersistentUnackedRangesWithMultipleEntriesEnabled());
- managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore(
- serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInMetadataStore());
- managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
- managedLedgerConfig
- .setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
- TimeUnit.MINUTES);
- managedLedgerConfig
- .setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
- TimeUnit.MINUTES);
- managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
-
- managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
- serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
- managedLedgerConfig
- .setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
- managedLedgerConfig
- .setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
- managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
- managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
- serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
- managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
- managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
- managedLedgerConfig
- .setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
-
- managedLedgerConfig
- .setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
- managedLedgerConfig
- .setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
- managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
- managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
- managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
- managedLedgerConfig.setInactiveLedgerRollOverTime(
- serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), TimeUnit.SECONDS);
- managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
- serviceConfig.isCacheEvictionByMarkDeletedPosition());
- managedLedgerConfig.setMinimumBacklogCursorsForCaching(
- serviceConfig.getManagedLedgerMinimumBacklogCursorsForCaching());
- managedLedgerConfig.setMinimumBacklogEntriesForCaching(
- serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching());
- managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching(
- serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching());
-
- OffloadPoliciesImpl nsLevelOffloadPolicies =
- (OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null);
- OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.mergeConfiguration(
- topicLevelOffloadPolicies,
- OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)),
- getPulsar().getConfig().getProperties());
- if (NamespaceService.isSystemServiceNamespace(namespace.toString())) {
- managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
- } else {
- if (topicLevelOffloadPolicies != null) {
- try {
- LedgerOffloader topicLevelLedgerOffLoader =
- pulsar().createManagedLedgerOffloader(offloadPolicies);
- managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
- } catch (PulsarServerException e) {
- throw new RuntimeException(e);
- }
- } else {
- //If the topic level policy is null, use the namespace level
- managedLedgerConfig
- .setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
+ managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
+ managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
+ managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
+
+ managedLedgerConfig
+ .setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
+ managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(
+ serviceConfig.isPersistentUnackedRangesWithMultipleEntriesEnabled());
+ managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore(
+ serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInMetadataStore());
+ managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
+ managedLedgerConfig
+ .setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
+ TimeUnit.MINUTES);
+ managedLedgerConfig
+ .setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
+ TimeUnit.MINUTES);
+ managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
+
+ managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
+ serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
+ managedLedgerConfig
+ .setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
+ managedLedgerConfig
+ .setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
+ managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
+ managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
+ serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
+ managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
+ managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
+ managedLedgerConfig
+ .setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
+
+ managedLedgerConfig
+ .setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
+ managedLedgerConfig
+ .setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
+ managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
+ managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
+ managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
+ managedLedgerConfig.setInactiveLedgerRollOverTime(
+ serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), TimeUnit.SECONDS);
+ managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
+ serviceConfig.isCacheEvictionByMarkDeletedPosition());
+ managedLedgerConfig.setMinimumBacklogCursorsForCaching(
+ serviceConfig.getManagedLedgerMinimumBacklogCursorsForCaching());
+ managedLedgerConfig.setMinimumBacklogEntriesForCaching(
+ serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching());
+ managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching(
+ serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching());
+
+ OffloadPoliciesImpl nsLevelOffloadPolicies =
+ (OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null);
+ OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.mergeConfiguration(
+ topicLevelOffloadPolicies,
+ OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)),
+ getPulsar().getConfig().getProperties());
+ if (NamespaceService.isSystemServiceNamespace(namespace.toString())) {
+ managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
+ } else {
+ if (topicLevelOffloadPolicies != null) {
+ try {
+ LedgerOffloader topicLevelLedgerOffLoader =
+ pulsar().createManagedLedgerOffloader(offloadPolicies);
+ managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
+ } catch (PulsarServerException e) {
+ throw new RuntimeException(e);
}
+ } else {
+ //If the topic level policy is null, use the namespace level
+ managedLedgerConfig
+ .setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
}
+ }
- managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
- serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
- managedLedgerConfig.setNewEntriesCheckDelayInMillis(
- serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
- return managedLedgerConfig;
- });
+ managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
+ serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
+ managedLedgerConfig.setNewEntriesCheckDelayInMillis(
+ serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
+ return managedLedgerConfig;
+ });
+ });
}
private void addTopicToStatsMaps(TopicName topicName, Topic topic) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 09f8de818db..ed76d37ae25 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -18,12 +18,14 @@
*/
package org.apache.pulsar.broker.service;
+import static java.util.Objects.requireNonNull;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -54,6 +56,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.FutureUtil;
+import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,8 +81,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
private final Map<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
readerCaches = new ConcurrentHashMap<>();
- @VisibleForTesting
- final Map<NamespaceName, Boolean> policyCacheInitMap = new ConcurrentHashMap<>();
+
+ final Map<NamespaceName, CompletableFuture<Void>> policyCacheInitMap = new ConcurrentHashMap<>();
@VisibleForTesting
final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = new ConcurrentHashMap<>();
@@ -219,12 +222,12 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
boolean isGlobal) throws TopicPoliciesCacheNotInitException {
if (!policyCacheInitMap.containsKey(topicName.getNamespaceObject())) {
NamespaceName namespace = topicName.getNamespaceObject();
- prepareInitPoliciesCache(namespace, new CompletableFuture<>());
+ prepareInitPoliciesCacheAsync(namespace);
}
MutablePair<TopicPoliciesCacheNotInitException, TopicPolicies> result = new MutablePair<>();
policyCacheInitMap.compute(topicName.getNamespaceObject(), (k, initialized) -> {
- if (initialized == null || !initialized) {
+ if (initialized == null || !initialized.isDone()) {
result.setLeft(new TopicPoliciesCacheNotInitException());
} else {
TopicPolicies topicPolicies =
@@ -242,6 +245,34 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
}
}
+ @NotNull
+ @Override
+ public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@NotNull TopicName topicName,
+ boolean isGlobal) {
+ requireNonNull(topicName);
+ final CompletableFuture<Void> preparedFuture = prepareInitPoliciesCacheAsync(topicName.getNamespaceObject());
+ return preparedFuture.thenApply(__ -> {
+ final TopicPolicies candidatePolicies = isGlobal
+ ? globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName()))
+ : policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
+ return Optional.ofNullable(candidatePolicies);
+ });
+ }
+
+ @NotNull
+ @Override
+ public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@NotNull TopicName topicName) {
+ requireNonNull(topicName);
+ final CompletableFuture<Void> preparedFuture = prepareInitPoliciesCacheAsync(topicName.getNamespaceObject());
+ return preparedFuture.thenApply(__ -> {
+ final TopicPolicies localPolicies = policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
+ if (localPolicies != null) {
+ return Optional.of(localPolicies);
+ }
+ return Optional.ofNullable(globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName())));
+ });
+ }
+
@Override
public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
return policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
@@ -265,39 +296,48 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
@Override
public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
- CompletableFuture<Void> result = new CompletableFuture<>();
NamespaceName namespace = namespaceBundle.getNamespaceObject();
if (NamespaceService.isHeartbeatNamespace(namespace)) {
- result.complete(null);
- return result;
+ return CompletableFuture.completedFuture(null);
}
synchronized (this) {
if (readerCaches.get(namespace) != null) {
ownedBundlesCountPerNamespace.get(namespace).incrementAndGet();
- result.complete(null);
+ return CompletableFuture.completedFuture(null);
} else {
- prepareInitPoliciesCache(namespace, result);
+ return prepareInitPoliciesCacheAsync(namespace);
}
}
- return result;
}
- private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace, CompletableFuture<Void> result) {
- if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
- CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
+ private @Nonnull CompletableFuture<Void> prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) {
+ requireNonNull(namespace);
+ return policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
+ final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
createSystemTopicClientWithRetry(namespace);
readerCaches.put(namespace, readerCompletableFuture);
ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
- readerCompletableFuture.thenAccept(reader -> {
- initPolicesCache(reader, result);
- result.thenRun(() -> readMorePolicies(reader));
- }).exceptionally(ex -> {
- log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
- cleanCacheAndCloseReader(namespace, false);
- result.completeExceptionally(ex);
+ final CompletableFuture<Void> initFuture = readerCompletableFuture
+ .thenCompose(reader -> {
+ final CompletableFuture<Void> stageFuture = new CompletableFuture<>();
+ initPolicesCache(reader, stageFuture);
+ return stageFuture
+ // Read policies in background
+ .thenAccept(__ -> readMorePoliciesAsync(reader));
+ });
+ initFuture.exceptionally(ex -> {
+ try {
+ log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
+ cleanCacheAndCloseReader(namespace, false);
+ } catch (Throwable cleanupEx) {
+ // Adding this catch to avoid break callback chain
+ log.error("[{}] Failed to cleanup reader on __change_events topic", namespace, cleanupEx);
+ }
return null;
});
- }
+ // let caller know we've got an exception.
+ return initFuture;
+ });
}
protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> createSystemTopicClientWithRetry(
@@ -381,8 +421,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
if (log.isDebugEnabled()) {
log.debug("[{}] Reach the end of the system topic.", reader.getSystemTopic().getTopicName());
}
- policyCacheInitMap.computeIfPresent(
- reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true);
+
// replay policy message
policiesCache.forEach(((topicName, topicPolicies) -> {
if (listeners.get(topicName) != null) {
@@ -395,6 +434,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
}
}
}));
+
future.complete(null);
}
});
@@ -420,7 +460,13 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
});
}
- private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> reader) {
+ /**
+ * This is an async method for the background reader to continue syncing new messages.
+ *
+ * Note: You should not do any blocking call here. because it will affect
+ * #{@link SystemTopicBasedTopicPoliciesService#getTopicPoliciesAsync(TopicName)} method to block loading topic.
+ */
+ private void readMorePoliciesAsync(SystemTopicClient.Reader<PulsarEvent> reader) {
reader.readNextAsync()
.thenAccept(msg -> {
refreshTopicPoliciesCache(msg);
@@ -428,7 +474,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
})
.whenComplete((__, ex) -> {
if (ex == null) {
- readMorePolicies(reader);
+ readMorePoliciesAsync(reader);
} else {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
if (cause instanceof PulsarClientException.AlreadyClosedException) {
@@ -437,7 +483,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
} else {
log.warn("Read more topic polices exception, read again.", ex);
- readMorePolicies(reader);
+ readMorePoliciesAsync(reader);
}
}
});
@@ -605,7 +651,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
}
@VisibleForTesting
- public Boolean getPoliciesCacheInit(NamespaceName namespaceName) {
+ public CompletableFuture<Void> getPoliciesCacheInit(NamespaceName namespaceName) {
return policyCacheInitMap.get(namespaceName);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index c4bcc0c3935..aa3a6aaeff2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -22,6 +22,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.BackoffBuilder;
@@ -31,6 +32,7 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.FutureUtil;
+import org.jetbrains.annotations.NotNull;
/**
* Topic policies service.
@@ -109,6 +111,32 @@ public interface TopicPoliciesService {
return response;
}
+ /**
+ * Asynchronously retrieves topic policies.
+ * This triggers the Pulsar broker's internal client to load policies from the
+ * system topic `persistent://tenant/namespace/__change_event`.
+ *
+ * @param topicName The name of the topic.
+ * @param isGlobal Indicates if the policies are global.
+ * @return A CompletableFuture containing an Optional of TopicPolicies.
+ * @throws NullPointerException If the topicName is null.
+ */
+ @Nonnull
+ CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull TopicName topicName, boolean isGlobal);
+
+ /**
+ * Asynchronously retrieves topic policies.
+ * This triggers the Pulsar broker's internal client to load policies from the
+ * system topic `persistent://tenant/namespace/__change_event`.
+ *
+ * NOTE: If local policies are not available, it will fallback to using topic global policies.
+ * @param topicName The name of the topic.
+ * @return A CompletableFuture containing an Optional of TopicPolicies.
+ * @throws NullPointerException If the topicName is null.
+ */
+ @Nonnull
+ CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull TopicName topicName);
+
/**
* Get policies for a topic without cache async.
* @param topicName topic name
@@ -162,6 +190,19 @@ public interface TopicPoliciesService {
return null;
}
+ @NotNull
+ @Override
+ public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@NotNull TopicName topicName,
+ boolean isGlobal) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+
+ @NotNull
+ @Override
+ public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@NotNull TopicName topicName) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+
@Override
public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
return null;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index a4f6bd4650f..25ad6cab942 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -127,6 +127,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
@Override
@BeforeMethod
protected void setup() throws Exception {
+ conf.setTopicLevelPoliciesEnabled(false);
super.internalSetup();
persistentTopics = spy(PersistentTopics.class);
persistentTopics.setServletContext(new MockServletContext());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 87471f4972f..faf141a5d1c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -180,7 +180,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
assertFalse(pulsar.getBrokerService().getTopics().containsKey(topic));
//make sure namespace policy reader is fully started.
Awaitility.await().untilAsserted(()-> {
- assertTrue(policyService.getPoliciesCacheInit(topicName.getNamespaceObject()));
+ assertTrue(policyService.getPoliciesCacheInit(topicName.getNamespaceObject()).isDone());
});
//load the topic.
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesWithBrokerRestartTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesWithBrokerRestartTest.java
new file mode 100644
index 00000000000..672fc2c95f8
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesWithBrokerRestartTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+@Test(groups = "broker-admin")
+public class TopicPoliciesWithBrokerRestartTest extends MockedPulsarServiceBaseTest {
+
+ @Override
+ @BeforeClass(alwaysRun = true)
+ protected void setup() throws Exception {
+ super.internalSetup();
+ setupDefaultTenantAndNamespace();
+ }
+
+ @Override
+ @AfterClass(alwaysRun = true)
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+
+ @Test
+ public void testRetentionWithBrokerRestart() throws Exception {
+ final int messages = 1_000;
+ final int topicNum = 500;
+ // (1) Init topic
+ admin.namespaces().createNamespace("public/retention");
+ final String topicName = "persistent://public/retention/retention_with_broker_restart";
+ admin.topics().createNonPartitionedTopic(topicName);
+ for (int i = 0; i < topicNum; i++) {
+ final String shadowTopicNames = topicName + "_" + i;
+ admin.topics().createNonPartitionedTopic(shadowTopicNames);
+ }
+ // (2) Set retention
+ final RetentionPolicies retentionPolicies = new RetentionPolicies(20, 20);
+ for (int i = 0; i < topicNum; i++) {
+ final String shadowTopicNames = topicName + "_" + i;
+ admin.topicPolicies().setRetention(shadowTopicNames, retentionPolicies);
+ }
+ admin.topicPolicies().setRetention(topicName, retentionPolicies);
+ // (3) Send messages
+ @Cleanup
+ final Producer<byte[]> publisher = pulsarClient.newProducer()
+ .topic(topicName)
+ .create();
+ for (int i = 0; i < messages; i++) {
+ publisher.send((i + "").getBytes(StandardCharsets.UTF_8));
+ }
+ // (4) Check configuration
+ Awaitility.await().untilAsserted(() -> {
+ final PersistentTopic persistentTopic1 = (PersistentTopic)
+ pulsar.getBrokerService().getTopic(topicName, true).join().get();
+ final ManagedLedgerImpl managedLedger1 = (ManagedLedgerImpl) persistentTopic1.getManagedLedger();
+ Assert.assertEquals(managedLedger1.getConfig().getRetentionSizeInMB(), 20);
+ Assert.assertEquals(managedLedger1.getConfig().getRetentionTimeMillis(),
+ TimeUnit.MINUTES.toMillis(20));
+ });
+ // (5) Restart broker
+ restartBroker();
+ // (6) Check configuration again
+ for (int i = 0; i < topicNum; i++) {
+ final String shadowTopicNames = topicName + "_" + i;
+ admin.lookups().lookupTopic(shadowTopicNames);
+ final PersistentTopic persistentTopicTmp = (PersistentTopic)
+ pulsar.getBrokerService().getTopic(shadowTopicNames, true).join().get();
+ final ManagedLedgerImpl managedLedgerTemp = (ManagedLedgerImpl) persistentTopicTmp.getManagedLedger();
+ Assert.assertEquals(managedLedgerTemp.getConfig().getRetentionSizeInMB(), 20);
+ Assert.assertEquals(managedLedgerTemp.getConfig().getRetentionTimeMillis(),
+ TimeUnit.MINUTES.toMillis(20));
+ }
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java
index efd8b66d754..234af7afa8d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java
@@ -84,6 +84,8 @@ public class TopicsAuthTest extends MockedPulsarServiceBaseTest {
Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderToken.class.getName());
conf.setAuthenticationProviders(providers);
+ conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+ conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN);
super.internalSetup();
PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
? brokerUrl.toString() : brokerUrlTls.toString())
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java
index 6ffcecbeb9f..942a42fa7aa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java
@@ -60,6 +60,8 @@ public class AuthLogsTest extends MockedPulsarServiceBaseTest {
conf.setAuthorizationEnabled(true);
conf.setAuthorizationAllowWildcardsMatching(true);
conf.setSuperUserRoles(Sets.newHashSet("super"));
+ conf.setBrokerClientAuthenticationPlugin(MockAuthentication.class.getName());
+ conf.setBrokerClientAuthenticationParameters("user:pass.pass");
internalSetup();
try (PulsarAdmin admin = PulsarAdmin.builder()
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java
index 0b1726617f7..25ac59796b0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java
@@ -29,7 +29,10 @@ import org.slf4j.LoggerFactory;
public class MockAuthentication implements Authentication {
private static final Logger log = LoggerFactory.getLogger(MockAuthentication.class);
- private final String user;
+ private String user;
+
+ public MockAuthentication() {
+ }
public MockAuthentication(String user) {
this.user = user;
@@ -67,6 +70,7 @@ public class MockAuthentication implements Authentication {
@Override
public void configure(Map<String, String> authParams) {
+ this.user = authParams.get("user");
}
@Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index c32d3fc3b0b..53f6af82bee 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -39,6 +39,7 @@ import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.TimeoutHandler;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
@@ -48,6 +49,8 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -231,6 +234,22 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
protected final void init() throws Exception {
doInitConf();
+ // trying to config the broker internal client
+ if (conf.getWebServicePortTls().isPresent()
+ && conf.getAuthenticationProviders().contains(AuthenticationProviderTls.class.getName())
+ && !conf.isTlsEnabledWithKeyStore()) {
+ // enabled TLS
+ if (conf.getBrokerClientAuthenticationPlugin() == null
+ || conf.getBrokerClientAuthenticationPlugin().equals(AuthenticationDisabled.class.getName())) {
+ conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
+ conf.setBrokerClientAuthenticationParameters("tlsCertFile:" + BROKER_CERT_FILE_PATH
+ + ",tlsKeyFile:" + BROKER_KEY_FILE_PATH);
+ conf.setBrokerClientTlsEnabled(true);
+ conf.setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH);
+ conf.setBrokerClientCertificateFilePath(BROKER_CERT_FILE_PATH);
+ conf.setBrokerClientKeyFilePath(BROKER_KEY_FILE_PATH);
+ }
+ }
startBroker();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
index 951892f4ebf..5252407892e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
@@ -304,6 +304,7 @@ public class BrokerBookieIsolationTest {
bookies[3].getBookieId());
ServiceConfiguration config = new ServiceConfiguration();
+ config.setTopicLevelPoliciesEnabled(false);
config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
config.setClusterName(cluster);
config.setWebServicePort(Optional.of(0));
@@ -612,9 +613,9 @@ public class BrokerBookieIsolationTest {
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
+ config.setTopicLevelPoliciesEnabled(false);
config.setAdvertisedAddress("localhost");
config.setBookkeeperClientIsolationGroups(brokerBookkeeperClientIsolationGroups);
-
config.setManagedLedgerDefaultEnsembleSize(2);
config.setManagedLedgerDefaultWriteQuorum(2);
config.setManagedLedgerDefaultAckQuorum(2);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 31b5bcb23cd..5b70ff99675 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -141,7 +141,7 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
// Wait for all topic policies updated.
Awaitility.await().untilAsserted(() ->
Assert.assertTrue(systemTopicBasedTopicPoliciesService
- .getPoliciesCacheInit(TOPIC1.getNamespaceObject())));
+ .getPoliciesCacheInit(TOPIC1.getNamespaceObject()).isDone()));
// Assert broker is cache all topic policies
Awaitility.await().untilAsserted(() ->
@@ -304,8 +304,8 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
@Test
public void testGetPolicyTimeout() throws Exception {
SystemTopicBasedTopicPoliciesService service = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
- Awaitility.await().untilAsserted(() -> assertTrue(service.policyCacheInitMap.get(TOPIC1.getNamespaceObject())));
- service.policyCacheInitMap.put(TOPIC1.getNamespaceObject(), false);
+ Awaitility.await().untilAsserted(() -> assertTrue(service.policyCacheInitMap.get(TOPIC1.getNamespaceObject()).isDone()));
+ service.policyCacheInitMap.put(TOPIC1.getNamespaceObject(), new CompletableFuture<>());
long start = System.currentTimeMillis();
Backoff backoff = new BackoffBuilder()
.setInitialTime(500, TimeUnit.MILLISECONDS)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index e29d015c45d..9995b6a28a9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -45,6 +45,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -620,7 +621,7 @@ public class PersistentTopicTest extends BrokerTestBase {
doReturn(policiesService).when(pulsar).getTopicPoliciesService();
TopicPolicies policies = new TopicPolicies();
policies.setRetentionPolicies(retentionPolicies);
- doReturn(policies).when(policiesService).getTopicPoliciesIfExists(TopicName.get(topic));
+ doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(policiesService).getTopicPoliciesAsync(TopicName.get(topic));
persistentTopic.onUpdate(policies);
verify(persistentTopic, times(1)).checkPersistencePolicies();
Awaitility.await().untilAsserted(() -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
index e3bd321d763..042c9b328d5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.tls.PublicSuffixMatcher;
import org.apache.pulsar.common.tls.TlsHostnameVerifier;
+import org.assertj.core.util.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -141,6 +142,7 @@ public class AuthenticationTlsHostnameVerificationTest extends ProducerConsumerB
// setup broker cert which has CN = "pulsar" different than broker's hostname="localhost"
conf.setBrokerServicePortTls(Optional.of(0));
conf.setWebServicePortTls(Optional.of(0));
+ conf.setAuthenticationProviders(Sets.newTreeSet(AuthenticationProviderTls.class.getName()));
conf.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH);
conf.setTlsCertificateFilePath(TLS_MIM_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_MIM_SERVER_KEY_FILE_PATH);
@@ -182,6 +184,7 @@ public class AuthenticationTlsHostnameVerificationTest extends ProducerConsumerB
// setup broker cert which has CN = "localhost"
conf.setBrokerServicePortTls(Optional.of(0));
conf.setWebServicePortTls(Optional.of(0));
+ conf.setAuthenticationProviders(Sets.newTreeSet(AuthenticationProviderTls.class.getName()));
conf.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH);
conf.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(BROKER_KEY_FILE_PATH);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index bdbe8efc8e6..9a36e0683b4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -119,6 +119,7 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
public void testProducerAndConsumerAuthorization() throws Exception {
log.info("-- Starting {} test --", methodName);
cleanup();
+ conf.setTopicLevelPoliciesEnabled(false);
conf.setAuthorizationProvider(TestAuthorizationProvider.class.getName());
setup();
@@ -179,6 +180,7 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
public void testSubscriberPermission() throws Exception {
log.info("-- Starting {} test --", methodName);
cleanup();
+ conf.setTopicLevelPoliciesEnabled(false);
conf.setEnablePackagesManagement(true);
conf.setPackagesManagementStorageProvider(MockedPackagesStorageProvider.class.getName());
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
@@ -369,6 +371,7 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
public void testClearBacklogPermission() throws Exception {
log.info("-- Starting {} test --", methodName);
cleanup();
+ conf.setTopicLevelPoliciesEnabled(false);
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
setup();
@@ -610,6 +613,7 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
public void testSubscriptionPrefixAuthorization() throws Exception {
log.info("-- Starting {} test --", methodName);
cleanup();
+ conf.setTopicLevelPoliciesEnabled(false);
conf.setAuthorizationProvider(TestAuthorizationProviderWithSubscriptionPrefix.class.getName());
setup();
@@ -749,6 +753,7 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
public void testPermissionForProducerCreateInitialSubscription() throws Exception {
log.info("-- Starting {} test --", methodName);
cleanup();
+ conf.setTopicLevelPoliciesEnabled(false);
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
setup();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
index 2fc8aebf64a..81d65b19204 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
@@ -195,7 +195,7 @@ public class MutualAuthenticationTest extends ProducerConsumerBase {
Set<String> superUserRoles = new HashSet<>();
superUserRoles.add("admin");
conf.setSuperUserRoles(superUserRoles);
-
+ conf.setTopicLevelPoliciesEnabled(false);
conf.setAuthorizationEnabled(true);
conf.setAuthenticationEnabled(true);
Set<String> providersClassNames = Sets.newHashSet(MutualAuthenticationProvider.class.getName());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
index 87f12e6acdc..4d5e7deaf7d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
@@ -37,6 +37,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.slf4j.Logger;
@@ -92,6 +93,8 @@ public class TokenAuthenticatedProducerConsumerTest extends ProducerConsumerBase
Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderToken.class.getName());
conf.setAuthenticationProviders(providers);
+ conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+ conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN);
conf.setClusterName("test");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
index fdf41c4a6ad..ba43ee6d6a2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
@@ -27,7 +27,9 @@ import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -39,6 +41,7 @@ import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,11 +90,12 @@ public class TokenOauth2AuthenticatedProducerConsumerTest extends ProducerConsum
conf.setAuthenticationProviders(providers);
conf.setBrokerClientAuthenticationPlugin(AuthenticationOAuth2.class.getName());
- conf.setBrokerClientAuthenticationParameters("{\n"
- + " \"privateKey\": \"" + CREDENTIALS_FILE + "\",\n"
- + " \"issuerUrl\": \"" + server.getIssuer() + "\",\n"
- + " \"audience\": \"" + audience + "\",\n"
- + "}\n");
+ final Map<String, String> oauth2Param = new HashMap<>();
+ oauth2Param.put("privateKey", CREDENTIALS_FILE);
+ oauth2Param.put("issuerUrl", server.getIssuer());
+ oauth2Param.put("audience", audience);
+ conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory
+ .getMapper().getObjectMapper().writeValueAsString(oauth2Param));
conf.setClusterName("test");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuthTest.java
index 8e508b6cf20..77405e14201 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuthTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuthTest.java
@@ -32,6 +32,7 @@ import java.util.function.Supplier;
import io.jsonwebtoken.SignatureAlgorithm;
import lombok.Cleanup;
+import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
@@ -49,6 +50,7 @@ import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -83,6 +85,7 @@ public class KeyStoreTlsProducerConsumerTestWithAuthTest extends ProducerConsume
super.internalCleanup();
}
+ @SneakyThrows
protected void internalSetUpForBroker() {
conf.setBrokerServicePortTls(Optional.of(0));
conf.setWebServicePortTls(Optional.of(0));
@@ -114,6 +117,25 @@ public class KeyStoreTlsProducerConsumerTestWithAuthTest extends ProducerConsume
conf.setAuthenticationProviders(providers);
conf.setNumExecutorThreadPoolSize(5);
+ Set<String> tlsProtocols = Sets.newConcurrentHashSet();
+ tlsProtocols.add("TLSv1.3");
+ tlsProtocols.add("TLSv1.2");
+ conf.setBrokerClientAuthenticationPlugin(AuthenticationKeyStoreTls.class.getName());
+ Map<String, String> authParams = new HashMap<>();
+ authParams.put(AuthenticationKeyStoreTls.KEYSTORE_TYPE, KEYSTORE_TYPE);
+ authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PATH, CLIENT_KEYSTORE_FILE_PATH);
+ authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PW, CLIENT_KEYSTORE_PW);
+ conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory.getMapper()
+ .getObjectMapper().writeValueAsString(authParams));
+ conf.setBrokerClientTlsEnabled(true);
+ conf.setBrokerClientTlsEnabledWithKeyStore(true);
+ conf.setBrokerClientTlsTrustStore(BROKER_TRUSTSTORE_FILE_PATH);
+ conf.setBrokerClientTlsTrustStorePassword(BROKER_TRUSTSTORE_PW);
+ conf.setBrokerClientTlsKeyStore(CLIENT_KEYSTORE_FILE_PATH);
+ conf.setBrokerClientTlsKeyStoreType(KEYSTORE_TYPE);
+ conf.setBrokerClientTlsKeyStorePassword(CLIENT_KEYSTORE_PW);
+ conf.setBrokerClientTlsProtocols(tlsProtocols);
+
}
protected void internalSetUpForClient(boolean addCertificates, String lookupUrl) throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
index 76936334eb0..b9139dabdf0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java
@@ -85,6 +85,7 @@ public class PatternTopicsConsumerImplAuthTest extends ProducerConsumerBase {
// set isTcpLookup = true, to use BinaryProtoLookupService to get topics for a pattern.
isTcpLookup = true;
+ conf.setTopicLevelPoliciesEnabled(false);
conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(true);
diff --git a/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf b/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf
index d9411e655ad..4e2fd402983 100644
--- a/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf
+++ b/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf
@@ -29,4 +29,5 @@ authenticationEnabled=true
authenticationProviders=org.apache.pulsar.MockTokenAuthenticationProvider
brokerClientAuthenticationPlugin=
brokerClientAuthenticationParameters=
-loadBalancerOverrideBrokerNicSpeedGbps=2
\ No newline at end of file
+loadBalancerOverrideBrokerNicSpeedGbps=2
+topicLevelPoliciesEnabled=false
\ No newline at end of file
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
index 8229d929ee5..9c8e5197adf 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
@@ -168,7 +168,7 @@ public class ProxyAuthenticationTest extends ProducerConsumerBase {
conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
// Expires after an hour
conf.setBrokerClientAuthenticationParameters(
- "entityType:broker,expiryTime:" + (System.currentTimeMillis() + 3600 * 1000));
+ "entityType:admin,expiryTime:" + (System.currentTimeMillis() + 3600 * 1000));
Set<String> superUserRoles = new HashSet<>();
superUserRoles.add("admin");
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
index 99af3b1cf6a..b7cfb874747 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
@@ -53,7 +53,7 @@ public class ProxyForwardAuthDataTest extends ProducerConsumerBase {
conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(true);
conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
- conf.setBrokerClientAuthenticationParameters("authParam:broker");
+ conf.setBrokerClientAuthenticationParameters("authParam:admin");
conf.setAuthenticateOriginalAuthData(true);
Set<String> superUserRoles = new HashSet<String>();
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
index 2c8c382b6a5..3259cfd95c7 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
@@ -144,7 +144,7 @@ public class ProxyRolesEnforcementTest extends ProducerConsumerBase {
conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(true);
conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
- conf.setBrokerClientAuthenticationParameters("authParam:broker");
+ conf.setBrokerClientAuthenticationParameters("authParam:admin");
Set<String> superUserRoles = new HashSet<>();
superUserRoles.add("admin");
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
index e8bb128c8c1..2d97a4b06a8 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
@@ -78,6 +78,8 @@ public class ProxyWithAuthorizationNegTest extends ProducerConsumerBase {
protected void setup() throws Exception {
// enable tls and auth&auth at broker
+ conf.setTopicLevelPoliciesEnabled(false);
+
conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(true);
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
index 9119ffed4e2..7b550b7270f 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -63,6 +64,9 @@ public class TestPulsarAuth extends MockedPulsarServiceBaseTest {
conf.setProperties(properties);
conf.setSuperUserRoles(Sets.newHashSet(SUPER_USER_ROLE));
conf.setClusterName("c1");
+ conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+ conf.setBrokerClientAuthenticationParameters("token:" + AuthTokenUtils
+ .createToken(secretKey, SUPER_USER_ROLE, Optional.empty()));
internalSetup();
admin.clusters().createCluster("c1", ClusterData.builder().build());
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
index 057039edc3b..49e5ae37834 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
@@ -89,6 +89,7 @@ public class ExtensibleLoadManagerTest extends TestRetrySupport {
"org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder");
brokerEnvs.put("forceDeleteNamespaceAllowed", "true");
brokerEnvs.put("loadBalancerDebugModeEnabled", "true");
+ brokerEnvs.put("topicLevelPoliciesEnabled", "false");
brokerEnvs.put("PULSAR_MEM", "-Xmx512M");
spec.brokerEnvs(brokerEnvs);
pulsarCluster = PulsarCluster.forSpec(spec);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
index 0a9bb5e1959..87db46f2bb6 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
@@ -68,6 +68,7 @@ public class TestPulsarSQLAuth extends TestPulsarSQLBase {
envMap.put("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(secretKey));
envMap.put("superUserRoles", "admin");
envMap.put("brokerDeleteInactiveTopicsEnabled", "false");
+ envMap.put("topicLevelPoliciesEnabled", "false");
for (BrokerContainer brokerContainer : pulsarCluster.getBrokers()) {
brokerContainer.withEnv(envMap);