You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/04/23 09:31:05 UTC

[GitHub] [pulsar] hangc0276 opened a new pull request #10341: 【BUG】Catch topic policy not hit exception in handleSubscribe

hangc0276 opened a new pull request #10341:
URL: https://github.com/apache/pulsar/pull/10341


   ### Motivation
   When running KOP with topic policy, it get the following exception.
   ```
   17:09:46.777 [mock-pulsar-bk:org.apache.pulsar.broker.service.ServerCnx@1010] WARN  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:55980][persistent://pulsar/test/localhost:15000/__change_events-partition-0][multiTopicsReader-c3d8054591] Failed to create consumer: consumerId=0, Topic policies cache have not init.
   java.util.concurrent.CompletionException: org.apache.pulsar.broker.service.BrokerServiceException$TopicPoliciesCacheNotInitException: Topic policies cache have not init.
           at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) ~[?:1.8.0_172]
           at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) ~[?:1.8.0_172]
           at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) ~[?:1.8.0_172]
           at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:953) ~[?:1.8.0_172]
           at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ~[?:1.8.0_172]
           at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_172]
           at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_172]
           at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) ~[?:1.8.0_172]
           at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632) ~[?:1.8.0_172]
           at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_172]
           at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_172]
           at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.lambda$addEntry$32(BookkeeperSchemaStorage.java:566) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
           at org.apache.bookkeeper.client.PulsarMockLedgerHandle.lambda$asyncAddEntry$5(PulsarMockLedgerHandle.java:194) ~[testmocks-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
           at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) [?:1.8.0_172]
           at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) [?:1.8.0_172]
           at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) [?:1.8.0_172]
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_172]
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_172]
           at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172]
   Caused by: org.apache.pulsar.broker.service.BrokerServiceException$TopicPoliciesCacheNotInitException: Topic policies cache have not init.
           at org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.getTopicPolicies(SystemTopicBasedTopicPoliciesService.java:148) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
           at org.apache.pulsar.broker.service.persistent.PersistentTopic.checkSubscriptionTypesEnable(PersistentTopic.java:2872) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
           at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:648) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
           at org.apache.pulsar.broker.service.ServerCnx.lambda$null$12(ServerCnx.java:966) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
           at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) ~[?:1.8.0_172]
           ... 15 more
   17:09:46.801 [pulsar-client-io-34-1:org.apache.pulsar.client.impl.ClientCnx@682] WARN  org.apache.pulsar.client.impl.ClientCnx - [id: 0xb9d000f5, L:/127.0.0.1:55980 - R:localhost/127.0.0.1:15002] Received error from server: Topic policies cache have not init.
   17:09:46.804 [pulsar-client-io-34-1:org.apache.pulsar.client.impl.ConsumerImpl@770] WARN  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://pulsar/test/localhost:15000/__change_events-partition-0][multiTopicsReader-c3d8054591] Failed to subscribe to topic on localhost/127.0.0.1:15002
   17:09:46.804 [pulsar-client-io-34-1:org.apache.pulsar.client.impl.ConnectionHandler@102] WARN  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/test/localhost:15000/__change_events-partition-0] [multiTopicsReader-c3d8054591] Could not get connection to broker: Topic policies cache have not init. -- Will try again in 0.1 s
   ```
   The reason is that it doesn't catch exception in getTopicPolicies, which will lead to subscribe failed.
   ```Java
   public boolean checkSubscriptionTypesEnable(SubType subType) throws Exception {
           TopicName topicName = TopicName.get(topic);
           if (brokerService.pulsar().getConfiguration().isTopicLevelPoliciesEnabled()) {
               TopicPolicies topicPolicies =
                       brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(TopicName.get(topic));
               if (topicPolicies == null) {
                   return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
               } else {
                   if (topicPolicies.getSubscriptionTypesEnabled().isEmpty()) {
                       return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
                   }
                   return topicPolicies.getSubscriptionTypesEnabled().contains(subType);
               }
           } else {
               return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
           }
       }
   ```
   
   ### Changes
   1. catch the exception in `checkSubscriptionTypesEnable`
   2. expose exception stack in `servercnx#handleSubscribe`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #10341: [BUG] Catch topic policy not hit exception in handleSubscribe

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #10341:
URL: https://github.com/apache/pulsar/pull/10341#issuecomment-836629513


   > @hangc0276 Can you address Matteo's comment?
   
   @sijie Ok, i will address it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli edited a comment on pull request #10341: 【BUG】Catch topic policy not hit exception in handleSubscribe

Posted by GitBox <gi...@apache.org>.
eolivelli edited a comment on pull request #10341:
URL: https://github.com/apache/pulsar/pull/10341#issuecomment-825551733


   Is It possibile to add a test case?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #10341: [BUG] Catch topic policy not hit exception in handleSubscribe

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #10341:
URL: https://github.com/apache/pulsar/pull/10341


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #10341: 【BUG】Catch topic policy not hit exception in handleSubscribe

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #10341:
URL: https://github.com/apache/pulsar/pull/10341#issuecomment-825551733


   Is Is possibile to add a test case?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #10341: [BUG] Catch topic policy not hit exception in handleSubscribe

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #10341:
URL: https://github.com/apache/pulsar/pull/10341#issuecomment-831654175


   @hangc0276 Can you address Matteo's comment?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #10341: 【BUG】Catch topic policy not hit exception in handleSubscribe

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #10341:
URL: https://github.com/apache/pulsar/pull/10341#discussion_r619555247



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1015,12 +1015,12 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
                                                     "[{}][{}][{}] Failed to create consumer because exclusive consumer"
                                                             + " is already connected: {}",
                                                     remoteAddress, topicName, subscriptionName,
-                                                    exception.getCause().getMessage());
+                                                    exception.getCause().getMessage(), exception);
                                         }
                                     } else if (exception.getCause() instanceof BrokerServiceException) {
                                         log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}",
                                                  remoteAddress, topicName, subscriptionName,
-                                                 consumerId,  exception.getCause().getMessage());
+                                                 consumerId,  exception.getCause().getMessage(), exception);

Review comment:
       This was done on purpose to avoid stack traces for "known" or expected exceptions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org