You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/02 05:36:37 UTC

[pulsar] 01/02: Fix using partitioned topic name to get Policy (#11294)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 46d8e9068046f7d6811292247ccac46571a20d73
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Wed Jul 14 14:43:00 2021 +0800

    Fix using partitioned topic name to get Policy (#11294)
    
    ### Motivation
    In the master branch, the REST API no longer allows the topic name of the partition to be used to set the topic policy, but there are still many places where it will be used internally.
    
    Suppose we set a Topic policy for `persistent://tenant/namespace/topic`
    However, the policy cannot be obtained through `persistent://tenant/namespace/topic-partition-0`, which causes the policy to become invalid.
    
    For example:PersistentTopic.checkSubscriptionTypesEnable
    
    ### Modifications
    Convert the name in SystemTopicBasedTopicPoliciesService
    
    (cherry picked from commit 35d29b9d67df27c9238c15ce66052efff12dedb7)
---
 .../pulsar/broker/service/BrokerService.java       | 12 +----
 .../SystemTopicBasedTopicPoliciesService.java      |  4 +-
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 57 +++++++++++++++-------
 3 files changed, 44 insertions(+), 29 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 2ebaee2..2c8c660 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1331,12 +1331,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
             OffloadPoliciesImpl topicLevelOffloadPolicies = null;
 
             if (pulsar.getConfig().isTopicLevelPoliciesEnabled()) {
-                TopicName cloneTopicName = topicName;
-                if (topicName.isPartitioned()) {
-                    cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
-                }
                 try {
-                    TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPolicies(cloneTopicName);
+                    TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPolicies(topicName);
                     if (topicPolicies != null) {
                         persistencePolicies = topicPolicies.getPersistence();
                         retentionPolicies = topicPolicies.getRetentionPolicies();
@@ -2630,12 +2626,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         if (!pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
             return Optional.empty();
         }
-        TopicName cloneTopicName = topicName;
-        if (topicName.isPartitioned()) {
-            cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
-        }
         try {
-            return Optional.ofNullable(pulsar.getTopicPoliciesService().getTopicPolicies(cloneTopicName));
+            return Optional.ofNullable(pulsar.getTopicPoliciesService().getTopicPolicies(topicName));
         } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
             log.debug("Topic {} policies have not been initialized yet.", topicName.getPartitionedTopicName());
             return Optional.empty();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index a21eb07..49f934a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -104,7 +104,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                                 .domain(topicName.getDomain().toString())
                                 .tenant(topicName.getTenant())
                                 .namespace(topicName.getNamespaceObject().getLocalName())
-                                .topic(topicName.getLocalName())
+                                .topic(TopicName.get(topicName.getPartitionedTopicName()).getLocalName())
                                 .policies(policies)
                                 .build())
                         .build()).whenComplete(((messageId, e) -> {
@@ -154,7 +154,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                 && !policyCacheInitMap.get(topicName.getNamespaceObject())) {
             throw new TopicPoliciesCacheNotInitException();
         }
-        return policiesCache.get(topicName);
+        return policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index b2a6e83..c40b38a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -18,8 +18,23 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -43,6 +58,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -63,23 +79,6 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
 @Slf4j
 @Test(groups = "broker")
 public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
@@ -2214,6 +2213,30 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
         assertNull(admin.topics().getMessageTTL(topic));
     }
 
+    @Test
+    public void testSubscriptionTypesWithPartitionedTopic() throws Exception {
+        final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
+        admin.topics().createPartitionedTopic(topic, 1);
+        pulsarClient.newConsumer().topic(topic).subscriptionName("test").subscribe().close();
+        Awaitility.await()
+                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
+        Set<SubscriptionType> subscriptionTypeSet = new HashSet<>();
+        subscriptionTypeSet.add(SubscriptionType.Key_Shared);
+        admin.topics().setSubscriptionTypesEnabled(topic, subscriptionTypeSet);
+        Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getSubscriptionTypesEnabled(topic)));
+
+        PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService()
+                .getTopicReference(TopicName.get(topic).getPartition(0).toString()).get();
+        Set<String> old = new HashSet<>(pulsar.getConfiguration().getSubscriptionTypesEnabled());
+        try {
+            pulsar.getConfiguration().getSubscriptionTypesEnabled().clear();
+            assertTrue(persistentTopic.checkSubscriptionTypesEnable(CommandSubscribe.SubType.Key_Shared));
+        } finally {
+            //restore
+            pulsar.getConfiguration().getSubscriptionTypesEnabled().addAll(old);
+        }
+    }
+
     @Test(timeOut = 30000)
     public void testSubscriptionTypesEnabled() throws Exception {
         final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();