You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/05/27 13:55:47 UTC
[pulsar] branch master updated: [BUG] Catch topic policy not hit
exception in handleSubscribe (#10341)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 608e36e [BUG] Catch topic policy not hit exception in handleSubscribe (#10341)
608e36e is described below
commit 608e36ee99655d7f139fdc77e8edff7438722b27
Author: hangc0276 <ch...@apache.org>
AuthorDate: Thu May 27 21:55:13 2021 +0800
[BUG] Catch topic policy not hit exception in handleSubscribe (#10341)
### Motivation
When running KOP with topic policy, it get the following exception.
```
17:09:46.777 [mock-pulsar-bk:org.apache.pulsar.broker.service.ServerCnx@1010] WARN org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:55980][persistent://pulsar/test/localhost:15000/__change_events-partition-0][multiTopicsReader-c3d8054591] Failed to create consumer: consumerId=0, Topic policies cache have not init.
java.util.concurrent.CompletionException: org.apache.pulsar.broker.service.BrokerServiceException$TopicPoliciesCacheNotInitException: Topic policies cache have not init.
at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) ~[?:1.8.0_172]
at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) ~[?:1.8.0_172]
at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) ~[?:1.8.0_172]
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:953) ~[?:1.8.0_172]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ~[?:1.8.0_172]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_172]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_172]
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) ~[?:1.8.0_172]
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632) ~[?:1.8.0_172]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_172]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_172]
at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.lambda$addEntry$32(BookkeeperSchemaStorage.java:566) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
at org.apache.bookkeeper.client.PulsarMockLedgerHandle.lambda$asyncAddEntry$5(PulsarMockLedgerHandle.java:194) ~[testmocks-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) [?:1.8.0_172]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) [?:1.8.0_172]
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) [?:1.8.0_172]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_172]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_172]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172]
Caused by: org.apache.pulsar.broker.service.BrokerServiceException$TopicPoliciesCacheNotInitException: Topic policies cache have not init.
at org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.getTopicPolicies(SystemTopicBasedTopicPoliciesService.java:148) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
at org.apache.pulsar.broker.service.persistent.PersistentTopic.checkSubscriptionTypesEnable(PersistentTopic.java:2872) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:648) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
at org.apache.pulsar.broker.service.ServerCnx.lambda$null$12(ServerCnx.java:966) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) ~[?:1.8.0_172]
... 15 more
17:09:46.801 [pulsar-client-io-34-1:org.apache.pulsar.client.impl.ClientCnx@682] WARN org.apache.pulsar.client.impl.ClientCnx - [id: 0xb9d000f5, L:/127.0.0.1:55980 - R:localhost/127.0.0.1:15002] Received error from server: Topic policies cache have not init.
17:09:46.804 [pulsar-client-io-34-1:org.apache.pulsar.client.impl.ConsumerImpl@770] WARN org.apache.pulsar.client.impl.ConsumerImpl - [persistent://pulsar/test/localhost:15000/__change_events-partition-0][multiTopicsReader-c3d8054591] Failed to subscribe to topic on localhost/127.0.0.1:15002
17:09:46.804 [pulsar-client-io-34-1:org.apache.pulsar.client.impl.ConnectionHandler@102] WARN org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/test/localhost:15000/__change_events-partition-0] [multiTopicsReader-c3d8054591] Could not get connection to broker: Topic policies cache have not init. -- Will try again in 0.1 s
```
The reason is that it doesn't catch exception in getTopicPolicies, which will lead to subscribe failed.
```Java
public boolean checkSubscriptionTypesEnable(SubType subType) throws Exception {
TopicName topicName = TopicName.get(topic);
if (brokerService.pulsar().getConfiguration().isTopicLevelPoliciesEnabled()) {
TopicPolicies topicPolicies =
brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(TopicName.get(topic));
if (topicPolicies == null) {
return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
} else {
if (topicPolicies.getSubscriptionTypesEnabled().isEmpty()) {
return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
}
return topicPolicies.getSubscriptionTypesEnabled().contains(subType);
}
} else {
return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
}
}
```
### Changes
1. catch the exception in `checkSubscriptionTypesEnable`
2. expose exception stack in `servercnx#handleSubscribe`
---
.../broker/service/persistent/PersistentTopic.java | 18 ++++++++++------
.../pulsar/broker/admin/TopicPoliciesTest.java | 25 ++++++++++++++++++++++
2 files changed, 36 insertions(+), 7 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 792b71b..7ce0578 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3024,15 +3024,19 @@ public class PersistentTopic extends AbstractTopic
public boolean checkSubscriptionTypesEnable(SubType subType) throws Exception {
TopicName topicName = TopicName.get(topic);
if (brokerService.pulsar().getConfiguration().isTopicLevelPoliciesEnabled()) {
- TopicPolicies topicPolicies =
- brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(TopicName.get(topic));
- if (topicPolicies == null) {
- return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
- } else {
- if (topicPolicies.getSubscriptionTypesEnabled().isEmpty()) {
+ try {
+ TopicPolicies topicPolicies =
+ brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(TopicName.get(topic));
+ if (topicPolicies == null) {
return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
+ } else {
+ if (topicPolicies.getSubscriptionTypesEnabled().isEmpty()) {
+ return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
+ }
+ return topicPolicies.getSubscriptionTypesEnabled().contains(subType);
}
- return topicPolicies.getSubscriptionTypesEnabled().contains(subType);
+ } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+ return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
}
} else {
return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index c892a43..7a9fb06 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -2159,4 +2159,29 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
assertEquals(admin.topics().getCompactionThreshold(topic, true).longValue(), brokerPolicy);
}
+ @Test(timeOut = 30000)
+ public void testProduceConsumeOnTopicPolicy() {
+ final String msg = "send message ";
+ int numMsg = 10;
+ try {
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(persistenceTopic)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionName("test").subscribe();
+
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(persistenceTopic).create();
+
+ for (int i = 0; i < numMsg; ++i) {
+ producer.newMessage().value(msg + i).send();
+ }
+
+ for (int i = 0; i < numMsg; ++i) {
+ Message<String> message = consumer.receive(100, TimeUnit.MILLISECONDS);
+ Assert.assertEquals(message.getValue(), msg + i);
+ }
+ } catch (PulsarClientException e) {
+ log.error("Failed to send/produce message, ", e);
+ Assert.fail();
+ }
+ }
+
}