You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/06 03:26:12 UTC

[pulsar] branch branch-2.7 updated: [Branch-2.7][Broker] Fix using partitioned topic name to get topic policies (#11897)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 3876c56  [Branch-2.7][Broker] Fix using partitioned topic name to get topic policies (#11897)
3876c56 is described below

commit 3876c56e6a6a0614dd5d5fb11d91fdc9c8f4a259
Author: ran <ga...@126.com>
AuthorDate: Mon Sep 6 11:25:29 2021 +0800

    [Branch-2.7][Broker] Fix using partitioned topic name to get topic policies (#11897)
    
    ### Motivation
    
    There is a bug that using the partitioned topic name to get topic policies. The PR https://github.com/apache/pulsar/pull/11294 fix this issue, but it's hard to cherry-pick the PR to `branch-2.7`, so create this PR to fix the issue in `branch-2.7`.
    
    This PR contains [PR-11294](https://github.com/apache/pulsar/pull/11294) and [PR-11863](https://github.com/apache/pulsar/pull/11863).
    
    ### Modifications
    
    1. Fix using the partitioned topic name to get topic policies.
    2. Change some warning logs to debug level for the getting topic policies operation.
    
    ### Verifying this change
    
    The test method `TopicPoliciesTest#testBacklogQuotaWithPartitionedTopic` is used to verify getting topic policies by the partitioned topic name.
---
 .../pulsar/broker/service/AbstractTopic.java       |  8 ++-----
 .../pulsar/broker/service/BacklogQuotaManager.java |  6 +++++-
 .../pulsar/broker/service/BrokerService.java       | 16 ++++----------
 .../SystemTopicBasedTopicPoliciesService.java      |  4 ++--
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 25 ++++++++++++++++++++++
 .../pulsar/broker/service/BrokerServiceTest.java   |  2 --
 6 files changed, 38 insertions(+), 23 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 0d40ecb..23b2221 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -559,14 +559,10 @@ public abstract class AbstractTopic implements Topic {
      * @return TopicPolicies is exist else return null.
      */
     public TopicPolicies getTopicPolicies(TopicName topicName) {
-        TopicName cloneTopicName = topicName;
-        if (topicName.isPartitioned()) {
-            cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
-        }
         try {
-            return brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(cloneTopicName);
+            return brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
         } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
-            log.warn("Topic {} policies cache have not init.", topicName.getPartitionedTopicName());
+            log.debug("Topic {} policies cache have not init.", topicName.getPartitionedTopicName());
             return null;
         } catch (NullPointerException e) {
             log.warn("Topic level policies are not enabled. " +
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index 4b965fd..020efa1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -86,8 +86,12 @@ public class BacklogQuotaManager {
                     .map(TopicPolicies::getBackLogQuotaMap)
                     .map(map -> map.get(BacklogQuotaType.destination_storage.name()))
                     .orElseGet(() -> getBacklogQuota(topicName.getNamespace(),policyPath));
+        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+            log.debug("Topic policies cache have not init, will apply the namespace backlog quota: topicName={}",
+                    topicName);
         } catch (Exception e) {
-            log.warn("Failed to read topic policies data, will apply the namespace backlog quota: topicName={}", topicName, e);
+            log.error("Failed to read topic policies data, "
+                    + "will apply the namespace backlog quota: topicName={}", topicName, e);
         }
         return getBacklogQuota(topicName.getNamespace(),policyPath);
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 4a18585..b7eca90 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1182,19 +1182,15 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
             OffloadPolicies topicLevelOffloadPolicies = null;
 
             if (pulsar.getConfig().isTopicLevelPoliciesEnabled()) {
-                TopicName cloneTopicName = topicName;
-                if (topicName.isPartitioned()) {
-                    cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
-                }
                 try {
-                    TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPolicies(cloneTopicName);
+                    TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPolicies(topicName);
                     if (topicPolicies != null) {
                         persistencePolicies = topicPolicies.getPersistence();
                         retentionPolicies = topicPolicies.getRetentionPolicies();
                         topicLevelOffloadPolicies = topicPolicies.getOffloadPolicies();
                     }
                 } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
-                    log.warn("Topic {} policies cache have not init.", topicName);
+                    log.debug("Topic {} policies have not been initialized yet.", topicName);
                 }
             }
 
@@ -2473,15 +2469,11 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
      * @return TopicPolicies is exist else return null.
      */
     public TopicPolicies getTopicPolicies(TopicName topicName) {
-        TopicName cloneTopicName = topicName;
-        if (topicName.isPartitioned()) {
-            cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
-        }
         try {
             checkTopicLevelPolicyEnable();
-            return pulsar.getTopicPoliciesService().getTopicPolicies(cloneTopicName);
+            return pulsar.getTopicPoliciesService().getTopicPolicies(topicName);
         } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
-            log.warn("Topic {} policies cache have not init.", topicName.getPartitionedTopicName());
+            log.debug("Topic {} policies cache have not init.", topicName.getPartitionedTopicName());
             return null;
         } catch (RestException | NullPointerException e) {
             log.warn("Topic level policies are not enabled. " +
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 18388f1..3dc5a10 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -92,7 +92,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                                 .domain(topicName.getDomain().toString())
                                 .tenant(topicName.getTenant())
                                 .namespace(topicName.getNamespaceObject().getLocalName())
-                                .topic(topicName.getLocalName())
+                                .topic(TopicName.get(topicName.getPartitionedTopicName()).getLocalName())
                                 .policies(policies)
                                 .build())
                         .build()).whenComplete(((messageId, e) -> {
@@ -148,7 +148,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                 && !policyCacheInitMap.get(topicName.getNamespaceObject())) {
             throw new TopicPoliciesCacheNotInitException();
         }
-        return policiesCache.get(topicName);
+        return policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index a1f226f..efd1a33 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
@@ -233,6 +234,30 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
     }
 
     @Test
+    public void testBacklogQuotaWithPartitionedTopic() throws Exception {
+        final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
+        admin.topics().createPartitionedTopic(topic, 3);
+        pulsarClient.newConsumer().topic(topic).subscriptionName("test").subscribe().close();
+        Awaitility.await()
+                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
+        BacklogQuota backlogQuota = new BacklogQuota(1234, BacklogQuota.RetentionPolicy.producer_exception);
+        admin.topics().setBacklogQuota(topic, backlogQuota);
+        Awaitility.await().atMost(3, TimeUnit.SECONDS)
+                .untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(topic)
+                        .get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota));
+
+        TopicPolicies topicPolicies0 = pulsar.getTopicPoliciesService().getTopicPolicies(
+                TopicName.get(topic).getPartition(0));
+        TopicPolicies topicPolicies1 = pulsar.getTopicPoliciesService().getTopicPolicies(
+                TopicName.get(topic).getPartition(1));
+        Assert.assertEquals(topicPolicies0, topicPolicies1);
+        Assert.assertEquals(topicPolicies0.getBackLogQuotaMap()
+                .get(BacklogQuota.BacklogQuotaType.destination_storage.name()).getLimit(), backlogQuota.getLimit());
+        Assert.assertEquals(topicPolicies0.getBackLogQuotaMap()
+                .get(BacklogQuota.BacklogQuotaType.destination_storage.name()).getPolicy(), backlogQuota.getPolicy());
+    }
+
+    @Test
     public void testCheckRetention() throws Exception {
         BacklogQuota backlogQuota =
                 new BacklogQuota(10 * 1024 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 214fb1e..9801311 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -593,8 +593,6 @@ public class BrokerServiceTest extends BrokerTestBase {
 
         } catch (Exception e) {
             fail("should not fail");
-        } finally {
-            pulsarClient.close();
         }
     }