You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/10 05:52:24 UTC

[pulsar] branch branch-2.11 updated: Move into future stage to catch the exception (#17556)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 4dc4c71cf8a Move  into future stage to catch the exception (#17556)
4dc4c71cf8a is described below

commit 4dc4c71cf8a66657704cfa7c14ccaa684dc98f5c
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Sat Sep 10 13:48:08 2022 +0800

    Move  into future stage to catch the exception (#17556)
---
 .../SystemTopicBasedTopicPoliciesService.java      | 37 ++++++++++++----------
 1 file changed, 20 insertions(+), 17 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index ed69428386a..93f97bbce07 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -380,23 +380,26 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
     }
 
     private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> reader) {
-        reader.readNextAsync().whenComplete((msg, ex) -> {
-            if (ex == null) {
-                refreshTopicPoliciesCache(msg);
-                notifyListener(msg);
-                readMorePolicies(reader);
-            } else {
-                Throwable cause = FutureUtil.unwrapCompletionException(ex);
-                if (cause instanceof PulsarClientException.AlreadyClosedException) {
-                    log.error("Read more topic policies exception, close the read now!", ex);
-                    cleanCacheAndCloseReader(
-                            reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
-                } else {
-                    log.warn("Read more topic polices exception, read again.", ex);
-                    readMorePolicies(reader);
-                }
-            }
-        });
+        reader.readNextAsync()
+              .thenAccept(msg -> {
+                  refreshTopicPoliciesCache(msg);
+                  notifyListener(msg);
+              })
+              .whenComplete((__, ex) -> {
+                  if (ex == null) {
+                      readMorePolicies(reader);
+                  } else {
+                      Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                      if (cause instanceof PulsarClientException.AlreadyClosedException) {
+                          log.error("Read more topic policies exception, close the read now!", ex);
+                          cleanCacheAndCloseReader(
+                                  reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
+                      } else {
+                          log.warn("Read more topic polices exception, read again.", ex);
+                          readMorePolicies(reader);
+                      }
+                  }
+              });
     }
 
     private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {