You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2024/01/22 03:30:01 UTC

(pulsar) branch master updated: [fix][broker] Unify topic-level policies enable judgment conditions (#19501)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 153cd5e429f [fix][broker] Unify topic-level policies enable judgment conditions (#19501)
153cd5e429f is described below

commit 153cd5e429fdfbde1266b07f57de692805fed538
Author: AloysZhang <lo...@gmail.com>
AuthorDate: Mon Jan 22 11:29:54 2024 +0800

    [fix][broker] Unify topic-level policies enable judgment conditions (#19501)
    
    Co-authored-by: Jiwe Guo <te...@apache.org>
---
 .../main/java/org/apache/pulsar/broker/ServiceConfiguration.java    | 4 ++++
 .../src/main/java/org/apache/pulsar/broker/PulsarService.java       | 2 +-
 .../src/main/java/org/apache/pulsar/broker/admin/AdminResource.java | 4 ++--
 .../org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java   | 2 +-
 .../main/java/org/apache/pulsar/broker/service/AbstractTopic.java   | 6 ++----
 .../main/java/org/apache/pulsar/broker/service/BrokerService.java   | 6 +++---
 .../apache/pulsar/broker/service/persistent/PersistentTopic.java    | 3 +--
 7 files changed, 14 insertions(+), 13 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 4aed15922f0..117ccc9661f 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -3554,4 +3554,8 @@ public class ServiceConfiguration implements PulsarConfiguration {
         return numWorkerThreadsForNonPersistentTopic > 0
                 ? numWorkerThreadsForNonPersistentTopic : topicOrderedExecutorThreadNum;
     }
+
+    public boolean isSystemTopicAndTopicLevelPoliciesEnabled() {
+        return topicLevelPoliciesEnabled && systemTopicEnabled;
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 83a91e6971e..054411c49f6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -864,7 +864,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
             this.nsService.initialize();
 
             // Start topic level policies service
-            if (config.isTopicLevelPoliciesEnabled() && config.isSystemTopicEnabled()) {
+            if (config.isSystemTopicAndTopicLevelPoliciesEnabled()) {
                 this.topicPoliciesService = new SystemTopicBasedTopicPoliciesService(this);
             }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 1526ae18a90..54bc0077103 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -395,7 +395,7 @@ public abstract class AdminResource extends PulsarWebResource {
     }
 
     protected void checkTopicLevelPolicyEnable() {
-        if (!config().isTopicLevelPoliciesEnabled()) {
+        if (!config().isSystemTopicAndTopicLevelPoliciesEnabled()) {
             throw new RestException(Status.METHOD_NOT_ALLOWED,
                     "Topic level policies is disabled, to enable the topic level policy and retry.");
         }
@@ -727,7 +727,7 @@ public abstract class AdminResource extends PulsarWebResource {
 
     protected CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityStrategyAsyncWithoutAuth() {
         CompletableFuture<SchemaCompatibilityStrategy> future = CompletableFuture.completedFuture(null);
-        if (config().isTopicLevelPoliciesEnabled()) {
+        if (config().isSystemTopicAndTopicLevelPoliciesEnabled()) {
             future = getTopicPoliciesAsyncWithRetry(topicName)
                     .thenApply(op -> op.map(TopicPolicies::getSchemaCompatibilityStrategy).orElse(null));
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index b7a654cc0a4..06584344b9a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3720,7 +3720,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> preValidation(boolean authoritative) {
-        if (!config().isTopicLevelPoliciesEnabled()) {
+        if (!config().isSystemTopicAndTopicLevelPoliciesEnabled()) {
             return FutureUtil.failedFuture(new RestException(Status.METHOD_NOT_ALLOWED,
                     "Topic level policies is disabled, to enable the topic level policy and retry."));
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index fb22827941e..65f2bc5a402 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -469,16 +469,14 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
     }
 
     protected void registerTopicPolicyListener() {
-        if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
-                && brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
+        if (brokerService.pulsar().getConfig().isSystemTopicAndTopicLevelPoliciesEnabled()) {
             brokerService.getPulsar().getTopicPoliciesService()
                     .registerListener(TopicName.getPartitionedTopicName(topic), this);
         }
     }
 
     protected void unregisterTopicPolicyListener() {
-        if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
-                && brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
+        if (brokerService.pulsar().getConfig().isSystemTopicAndTopicLevelPoliciesEnabled()) {
             brokerService.getPulsar().getTopicPoliciesService()
                     .unregisterListener(TopicName.getPartitionedTopicName(topic), this);
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 1d50c2b4bae..8654a830050 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1063,7 +1063,7 @@ public class BrokerService implements Closeable {
     private CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesBypassSystemTopic(@Nonnull TopicName topicName) {
         Objects.requireNonNull(topicName);
         final ServiceConfiguration serviceConfiguration = pulsar.getConfiguration();
-        if (serviceConfiguration.isSystemTopicEnabled() && serviceConfiguration.isTopicLevelPoliciesEnabled()
+        if (serviceConfiguration.isSystemTopicAndTopicLevelPoliciesEnabled()
                 && !NamespaceService.isSystemServiceNamespace(topicName.getNamespace())
                 && !SystemTopicNames.isTopicPoliciesSystemTopic(topicName.toString())) {
             return pulsar.getTopicPoliciesService().getTopicPoliciesAsync(topicName);
@@ -3458,7 +3458,7 @@ public class BrokerService implements Closeable {
      * @return TopicPolicies, if they exist. Otherwise, the value will not be present.
      */
     public Optional<TopicPolicies> getTopicPolicies(TopicName topicName) {
-        if (!pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
+        if (!pulsar().getConfig().isSystemTopicAndTopicLevelPoliciesEnabled()) {
             return Optional.empty();
         }
         return Optional.ofNullable(pulsar.getTopicPoliciesService()
@@ -3467,7 +3467,7 @@ public class BrokerService implements Closeable {
 
     public CompletableFuture<Void> deleteTopicPolicies(TopicName topicName) {
         final PulsarService pulsarService = pulsar();
-        if (!pulsarService.getConfig().isTopicLevelPoliciesEnabled()) {
+        if (!pulsarService.getConfig().isSystemTopicAndTopicLevelPoliciesEnabled()) {
             return CompletableFuture.completedFuture(null);
         }
         return pulsar.getTopicPoliciesService()
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 1088529413d..a375ebf2809 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3999,8 +3999,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     }
 
     protected CompletableFuture<Void> initTopicPolicy() {
-        if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
-                && brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
+        if (brokerService.pulsar().getConfig().isSystemTopicAndTopicLevelPoliciesEnabled()) {
             brokerService.getPulsar().getTopicPoliciesService()
                     .registerListener(TopicName.getPartitionedTopicName(topic), this);
             return CompletableFuture.completedFuture(null).thenRunAsync(() -> onUpdate(