You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/09/13 01:15:16 UTC

[pulsar] branch branch-2.10 updated: Move into future stage to catch the exception (#17556)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 2a31b8b0806 Move  into future stage to catch the exception (#17556)
2a31b8b0806 is described below

commit 2a31b8b080699efd64f5edc2685ef06f40e4c4ca
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Sat Sep 10 13:48:08 2022 +0800

    Move  into future stage to catch the exception (#17556)
    
    (cherry picked from commit 12a6cc46bcde943bfb08fa83a9822b81f20508d9)
---
 .../SystemTopicBasedTopicPoliciesService.java      | 37 ++++++++++++----------
 1 file changed, 20 insertions(+), 17 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index fb4d6f056ed..38a49df06b3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -385,23 +385,26 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
     }
 
     private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> reader) {
-        reader.readNextAsync().whenComplete((msg, ex) -> {
-            if (ex == null) {
-                refreshTopicPoliciesCache(msg);
-                notifyListener(msg);
-                readMorePolicies(reader);
-            } else {
-                Throwable cause = FutureUtil.unwrapCompletionException(ex);
-                if (cause instanceof PulsarClientException.AlreadyClosedException) {
-                    log.error("Read more topic policies exception, close the read now!", ex);
-                    cleanCacheAndCloseReader(
-                            reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
-                } else {
-                    log.warn("Read more topic polices exception, read again.", ex);
-                    readMorePolicies(reader);
-                }
-            }
-        });
+        reader.readNextAsync()
+              .thenAccept(msg -> {
+                  refreshTopicPoliciesCache(msg);
+                  notifyListener(msg);
+              })
+              .whenComplete((__, ex) -> {
+                  if (ex == null) {
+                      readMorePolicies(reader);
+                  } else {
+                      Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                      if (cause instanceof PulsarClientException.AlreadyClosedException) {
+                          log.error("Read more topic policies exception, close the read now!", ex);
+                          cleanCacheAndCloseReader(
+                                  reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
+                      } else {
+                          log.warn("Read more topic polices exception, read again.", ex);
+                          readMorePolicies(reader);
+                      }
+                  }
+              });
     }
 
     private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {