You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/05/27 13:55:47 UTC

[pulsar] branch master updated: [BUG] Catch topic policy not hit exception in handleSubscribe (#10341)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 608e36e  [BUG] Catch topic policy not hit exception in handleSubscribe (#10341)
608e36e is described below

commit 608e36ee99655d7f139fdc77e8edff7438722b27
Author: hangc0276 <ch...@apache.org>
AuthorDate: Thu May 27 21:55:13 2021 +0800

    [BUG] Catch topic policy not hit exception in handleSubscribe (#10341)
    
    ### Motivation
    When running KOP with topic policy, it get the following exception.
    ```
    17:09:46.777 [mock-pulsar-bk:org.apache.pulsar.broker.service.ServerCnx@1010] WARN  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:55980][persistent://pulsar/test/localhost:15000/__change_events-partition-0][multiTopicsReader-c3d8054591] Failed to create consumer: consumerId=0, Topic policies cache have not init.
    java.util.concurrent.CompletionException: org.apache.pulsar.broker.service.BrokerServiceException$TopicPoliciesCacheNotInitException: Topic policies cache have not init.
            at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) ~[?:1.8.0_172]
            at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) ~[?:1.8.0_172]
            at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) ~[?:1.8.0_172]
            at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:953) ~[?:1.8.0_172]
            at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ~[?:1.8.0_172]
            at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_172]
            at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_172]
            at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) ~[?:1.8.0_172]
            at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632) ~[?:1.8.0_172]
            at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_172]
            at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_172]
            at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.lambda$addEntry$32(BookkeeperSchemaStorage.java:566) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
            at org.apache.bookkeeper.client.PulsarMockLedgerHandle.lambda$asyncAddEntry$5(PulsarMockLedgerHandle.java:194) ~[testmocks-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
            at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) [?:1.8.0_172]
            at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) [?:1.8.0_172]
            at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) [?:1.8.0_172]
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_172]
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_172]
            at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172]
    Caused by: org.apache.pulsar.broker.service.BrokerServiceException$TopicPoliciesCacheNotInitException: Topic policies cache have not init.
            at org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.getTopicPolicies(SystemTopicBasedTopicPoliciesService.java:148) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
            at org.apache.pulsar.broker.service.persistent.PersistentTopic.checkSubscriptionTypesEnable(PersistentTopic.java:2872) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
            at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:648) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
            at org.apache.pulsar.broker.service.ServerCnx.lambda$null$12(ServerCnx.java:966) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
            at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) ~[?:1.8.0_172]
            ... 15 more
    17:09:46.801 [pulsar-client-io-34-1:org.apache.pulsar.client.impl.ClientCnx@682] WARN  org.apache.pulsar.client.impl.ClientCnx - [id: 0xb9d000f5, L:/127.0.0.1:55980 - R:localhost/127.0.0.1:15002] Received error from server: Topic policies cache have not init.
    17:09:46.804 [pulsar-client-io-34-1:org.apache.pulsar.client.impl.ConsumerImpl@770] WARN  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://pulsar/test/localhost:15000/__change_events-partition-0][multiTopicsReader-c3d8054591] Failed to subscribe to topic on localhost/127.0.0.1:15002
    17:09:46.804 [pulsar-client-io-34-1:org.apache.pulsar.client.impl.ConnectionHandler@102] WARN  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/test/localhost:15000/__change_events-partition-0] [multiTopicsReader-c3d8054591] Could not get connection to broker: Topic policies cache have not init. -- Will try again in 0.1 s
    ```
    The reason is that it doesn't catch exception in getTopicPolicies, which will lead to subscribe failed.
    ```Java
    public boolean checkSubscriptionTypesEnable(SubType subType) throws Exception {
            TopicName topicName = TopicName.get(topic);
            if (brokerService.pulsar().getConfiguration().isTopicLevelPoliciesEnabled()) {
                TopicPolicies topicPolicies =
                        brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(TopicName.get(topic));
                if (topicPolicies == null) {
                    return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
                } else {
                    if (topicPolicies.getSubscriptionTypesEnabled().isEmpty()) {
                        return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
                    }
                    return topicPolicies.getSubscriptionTypesEnabled().contains(subType);
                }
            } else {
                return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
            }
        }
    ```
    
    ### Changes
    1. catch the exception in `checkSubscriptionTypesEnable`
    2. expose exception stack in `servercnx#handleSubscribe`
---
 .../broker/service/persistent/PersistentTopic.java | 18 ++++++++++------
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 25 ++++++++++++++++++++++
 2 files changed, 36 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 792b71b..7ce0578 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3024,15 +3024,19 @@ public class PersistentTopic extends AbstractTopic
     public boolean checkSubscriptionTypesEnable(SubType subType) throws Exception {
         TopicName topicName = TopicName.get(topic);
         if (brokerService.pulsar().getConfiguration().isTopicLevelPoliciesEnabled()) {
-            TopicPolicies topicPolicies =
-                    brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(TopicName.get(topic));
-            if (topicPolicies == null) {
-                return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
-            } else {
-                if (topicPolicies.getSubscriptionTypesEnabled().isEmpty()) {
+            try {
+                TopicPolicies topicPolicies =
+                        brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(TopicName.get(topic));
+                if (topicPolicies == null) {
                     return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
+                } else {
+                    if (topicPolicies.getSubscriptionTypesEnabled().isEmpty()) {
+                        return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
+                    }
+                    return topicPolicies.getSubscriptionTypesEnabled().contains(subType);
                 }
-                return topicPolicies.getSubscriptionTypesEnabled().contains(subType);
+            } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+                return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
             }
         } else {
             return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index c892a43..7a9fb06 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -2159,4 +2159,29 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
         assertEquals(admin.topics().getCompactionThreshold(topic, true).longValue(), brokerPolicy);
     }
 
+    @Test(timeOut = 30000)
+    public void testProduceConsumeOnTopicPolicy() {
+        final String msg = "send message ";
+        int numMsg = 10;
+        try {
+            Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(persistenceTopic)
+                    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                    .subscriptionName("test").subscribe();
+
+            Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(persistenceTopic).create();
+
+            for (int i = 0; i < numMsg; ++i) {
+                producer.newMessage().value(msg + i).send();
+            }
+
+            for (int i = 0; i < numMsg; ++i) {
+                Message<String> message = consumer.receive(100, TimeUnit.MILLISECONDS);
+                Assert.assertEquals(message.getValue(), msg + i);
+            }
+        } catch (PulsarClientException e) {
+            log.error("Failed to send/produce message, ", e);
+            Assert.fail();
+        }
+    }
+
 }