You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/09/13 01:15:16 UTC
[pulsar] branch branch-2.10 updated: Move into future stage to catch the exception (#17556)
This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 2a31b8b0806 Move into future stage to catch the exception (#17556)
2a31b8b0806 is described below
commit 2a31b8b080699efd64f5edc2685ef06f40e4c4ca
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Sat Sep 10 13:48:08 2022 +0800
Move into future stage to catch the exception (#17556)
(cherry picked from commit 12a6cc46bcde943bfb08fa83a9822b81f20508d9)
---
.../SystemTopicBasedTopicPoliciesService.java | 37 ++++++++++++----------
1 file changed, 20 insertions(+), 17 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index fb4d6f056ed..38a49df06b3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -385,23 +385,26 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
}
private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> reader) {
- reader.readNextAsync().whenComplete((msg, ex) -> {
- if (ex == null) {
- refreshTopicPoliciesCache(msg);
- notifyListener(msg);
- readMorePolicies(reader);
- } else {
- Throwable cause = FutureUtil.unwrapCompletionException(ex);
- if (cause instanceof PulsarClientException.AlreadyClosedException) {
- log.error("Read more topic policies exception, close the read now!", ex);
- cleanCacheAndCloseReader(
- reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
- } else {
- log.warn("Read more topic polices exception, read again.", ex);
- readMorePolicies(reader);
- }
- }
- });
+ reader.readNextAsync()
+ .thenAccept(msg -> {
+ refreshTopicPoliciesCache(msg);
+ notifyListener(msg);
+ })
+ .whenComplete((__, ex) -> {
+ if (ex == null) {
+ readMorePolicies(reader);
+ } else {
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ if (cause instanceof PulsarClientException.AlreadyClosedException) {
+ log.error("Read more topic policies exception, close the read now!", ex);
+ cleanCacheAndCloseReader(
+ reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
+ } else {
+ log.warn("Read more topic polices exception, read again.", ex);
+ readMorePolicies(reader);
+ }
+ }
+ });
}
private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {