You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/03/27 22:39:32 UTC

(pulsar) 04/05: [fix][broker] Avoid execute prepareInitPoliciesCacheAsync if namespace is deleted (#22268)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d4610a0d60139f8660c9d58bf2d8a7981f03d19a
Author: hanmz <gu...@tencent.com>
AuthorDate: Mon Mar 18 06:45:02 2024 +0800

    [fix][broker] Avoid execute prepareInitPoliciesCacheAsync if namespace is deleted (#22268)
    
    (cherry picked from commit 96d77f7e1d5b9c56070eaed5c31213a8144871d3)
---
 .../SystemTopicBasedTopicPoliciesService.java      | 66 +++++++++++++---------
 .../SystemTopicBasedTopicPoliciesServiceTest.java  | 19 +++++++
 2 files changed, 58 insertions(+), 27 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 71f78e21f93..4e9e875bcf4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -324,34 +324,46 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
         }
     }
 
-    private @Nonnull CompletableFuture<Void> prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) {
+    @VisibleForTesting
+    @Nonnull CompletableFuture<Void> prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) {
         requireNonNull(namespace);
-        return policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
-            final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
-                    createSystemTopicClient(namespace);
-            readerCaches.put(namespace, readerCompletableFuture);
-            ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
-            final CompletableFuture<Void> initFuture = readerCompletableFuture
-                    .thenCompose(reader -> {
-                        final CompletableFuture<Void> stageFuture = new CompletableFuture<>();
-                        initPolicesCache(reader, stageFuture);
-                        return stageFuture
-                                // Read policies in background
-                                .thenAccept(__ -> readMorePoliciesAsync(reader));
-                    });
-            initFuture.exceptionally(ex -> {
-                try {
-                    log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
-                    cleanCacheAndCloseReader(namespace, false);
-                } catch (Throwable cleanupEx) {
-                    // Adding this catch to avoid break callback chain
-                    log.error("[{}] Failed to cleanup reader on __change_events topic", namespace, cleanupEx);
-                }
-                return null;
-            });
-            // let caller know we've got an exception.
-            return initFuture;
-        });
+        return pulsarService.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace)
+                        .thenCompose(namespacePolicies -> {
+                            if (namespacePolicies.isEmpty() || namespacePolicies.get().deleted) {
+                                log.info("[{}] skip prepare init policies cache since the namespace is deleted",
+                                        namespace);
+                                return CompletableFuture.completedFuture(null);
+                            }
+
+                            return policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
+                                final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
+                                        createSystemTopicClient(namespace);
+                                readerCaches.put(namespace, readerCompletableFuture);
+                                ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
+                                final CompletableFuture<Void> initFuture = readerCompletableFuture
+                                        .thenCompose(reader -> {
+                                            final CompletableFuture<Void> stageFuture = new CompletableFuture<>();
+                                            initPolicesCache(reader, stageFuture);
+                                            return stageFuture
+                                                    // Read policies in background
+                                                    .thenAccept(__ -> readMorePoliciesAsync(reader));
+                                        });
+                                initFuture.exceptionally(ex -> {
+                                    try {
+                                        log.error("[{}] Failed to create reader on __change_events topic",
+                                                namespace, ex);
+                                        cleanCacheAndCloseReader(namespace, false);
+                                    } catch (Throwable cleanupEx) {
+                                        // Adding this catch to avoid break callback chain
+                                        log.error("[{}] Failed to cleanup reader on __change_events topic",
+                                                namespace, cleanupEx);
+                                    }
+                                    return null;
+                                });
+                                // let caller know we've got an exception.
+                                return initFuture;
+                            });
+                        });
     }
 
     protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> createSystemTopicClient(
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 343a18da6d8..e571da13435 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -71,6 +71,8 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
 
     private static final String NAMESPACE4 = "system-topic/namespace-4";
 
+    private static final String NAMESPACE5 = "system-topic/namespace-5";
+
     private static final TopicName TOPIC1 = TopicName.get("persistent", NamespaceName.get(NAMESPACE1), "topic-1");
     private static final TopicName TOPIC2 = TopicName.get("persistent", NamespaceName.get(NAMESPACE1), "topic-2");
     private static final TopicName TOPIC3 = TopicName.get("persistent", NamespaceName.get(NAMESPACE2), "topic-1");
@@ -464,4 +466,21 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
         admin.namespaces().deleteNamespace(NAMESPACE4);
         Assert.assertNull(service.getWriterCaches().synchronous().getIfPresent(NamespaceName.get(NAMESPACE4)));
     }
+
+    @Test
+    public void testPrepareInitPoliciesCacheAsyncWhenNamespaceBeingDeleted() throws Exception {
+        SystemTopicBasedTopicPoliciesService service = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
+        admin.namespaces().createNamespace(NAMESPACE5);
+
+        NamespaceName namespaceName = NamespaceName.get(NAMESPACE5);
+        pulsar.getPulsarResources().getNamespaceResources().setPolicies(namespaceName,
+                old -> {
+                    old.deleted = true;
+                    return old;
+                });
+
+        assertNull(service.getPoliciesCacheInit(namespaceName));
+        service.prepareInitPoliciesCacheAsync(namespaceName).get();
+        admin.namespaces().deleteNamespace(NAMESPACE5);
+    }
 }