You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/13 01:15:06 UTC
[pulsar] branch branch-2.9 updated: Move into future stage to catch the exception (#17556)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 3e632fe5e03 Move into future stage to catch the exception (#17556)
3e632fe5e03 is described below
commit 3e632fe5e03d2a3c756ecf6c1ba7b5afa0ec16c6
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Sat Sep 10 13:48:08 2022 +0800
Move into future stage to catch the exception (#17556)
(cherry picked from commit 12a6cc46bcde943bfb08fa83a9822b81f20508d9)
---
.../SystemTopicBasedTopicPoliciesService.java | 37 ++++++++++++----------
1 file changed, 20 insertions(+), 17 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 1c018141150..a5bcaa5728d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -359,23 +359,26 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
}
private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> reader) {
- reader.readNextAsync().whenComplete((msg, ex) -> {
- if (ex == null) {
- refreshTopicPoliciesCache(msg);
- notifyListener(msg);
- readMorePolicies(reader);
- } else {
- Throwable cause = FutureUtil.unwrapCompletionException(ex);
- if (cause instanceof PulsarClientException.AlreadyClosedException) {
- log.error("Read more topic policies exception, close the read now!", ex);
- cleanCacheAndCloseReader(
- reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
- } else {
- log.warn("Read more topic polices exception, read again.", ex);
- readMorePolicies(reader);
- }
- }
- });
+ reader.readNextAsync()
+ .thenAccept(msg -> {
+ refreshTopicPoliciesCache(msg);
+ notifyListener(msg);
+ })
+ .whenComplete((__, ex) -> {
+ if (ex == null) {
+ readMorePolicies(reader);
+ } else {
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ if (cause instanceof PulsarClientException.AlreadyClosedException) {
+ log.error("Read more topic policies exception, close the read now!", ex);
+ cleanCacheAndCloseReader(
+ reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
+ } else {
+ log.warn("Read more topic polices exception, read again.", ex);
+ readMorePolicies(reader);
+ }
+ }
+ });
}
private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {