You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/09/13 01:44:12 UTC

[pulsar] branch branch-2.9 updated: [fix][broker] Topic policy reader can't recover when get any exception (#17562)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new db8480b66f0 [fix][broker] Topic policy reader can't recover when get any exception (#17562)
db8480b66f0 is described below

commit db8480b66f0634193e85de01358a1e34648d5595
Author: Qiang Zhao <ma...@apache.org>
AuthorDate: Tue Sep 13 09:06:25 2022 +0800

    [fix][broker] Topic policy reader can't recover when get any exception (#17562)
    
    (cherry picked from commit 5aa1f1101c6fef61163b34558cc98bf362dfa969)
---
 .../SystemTopicBasedTopicPoliciesService.java      | 28 ++++++++++++----------
 1 file changed, 16 insertions(+), 12 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index a5bcaa5728d..38ec5d803c6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nonnull;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
@@ -230,21 +231,20 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
         return result;
     }
 
-    private void prepareInitPoliciesCache(NamespaceName namespace, CompletableFuture<Void> result) {
+    private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace, CompletableFuture<Void> result) {
         if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
             CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
                     createSystemTopicClientWithRetry(namespace);
             readerCaches.put(namespace, readerCompletableFuture);
             ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
-            readerCompletableFuture.whenComplete((reader, ex) -> {
-                if (ex != null) {
-                    log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
-                    result.completeExceptionally(ex);
-                    cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
-                } else {
-                    initPolicesCache(reader, result);
-                    result.thenRun(() -> readMorePolicies(reader));
-                }
+            readerCompletableFuture.thenAccept(reader -> {
+                initPolicesCache(reader, result);
+                result.thenRun(() -> readMorePolicies(reader));
+            }).exceptionally(ex -> {
+                log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
+                cleanCacheAndCloseReader(namespace, false);
+                result.completeExceptionally(ex);
+                return null;
             });
         }
     }
@@ -346,14 +346,18 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
         });
     }
 
-    private void cleanCacheAndCloseReader(NamespaceName namespace, boolean cleanOwnedBundlesCount) {
+    private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean cleanOwnedBundlesCount) {
         CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerFuture = readerCaches.remove(namespace);
         policiesCache.entrySet().removeIf(entry -> Objects.equals(entry.getKey().getNamespaceObject(), namespace));
         if (cleanOwnedBundlesCount) {
             ownedBundlesCountPerNamespace.remove(namespace);
         }
         if (readerFuture != null && !readerFuture.isCompletedExceptionally()) {
-            readerFuture.thenAccept(SystemTopicClient.Reader::closeAsync);
+            readerFuture.thenCompose(SystemTopicClient.Reader::closeAsync)
+                    .exceptionally(ex -> {
+                        log.warn("[{}] Close change_event reader fail.", namespace, ex);
+                        return null;
+                    });
         }
         policyCacheInitMap.remove(namespace);
     }