You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/03/27 22:39:32 UTC
(pulsar) 04/05: [fix][broker] Avoid execute prepareInitPoliciesCacheAsync if namespace is deleted (#22268)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d4610a0d60139f8660c9d58bf2d8a7981f03d19a
Author: hanmz <gu...@tencent.com>
AuthorDate: Mon Mar 18 06:45:02 2024 +0800
[fix][broker] Avoid execute prepareInitPoliciesCacheAsync if namespace is deleted (#22268)
(cherry picked from commit 96d77f7e1d5b9c56070eaed5c31213a8144871d3)
---
.../SystemTopicBasedTopicPoliciesService.java | 66 +++++++++++++---------
.../SystemTopicBasedTopicPoliciesServiceTest.java | 19 +++++++
2 files changed, 58 insertions(+), 27 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 71f78e21f93..4e9e875bcf4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -324,34 +324,46 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
}
}
- private @Nonnull CompletableFuture<Void> prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) {
+ @VisibleForTesting
+ @Nonnull CompletableFuture<Void> prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) {
requireNonNull(namespace);
- return policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
- final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
- createSystemTopicClient(namespace);
- readerCaches.put(namespace, readerCompletableFuture);
- ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
- final CompletableFuture<Void> initFuture = readerCompletableFuture
- .thenCompose(reader -> {
- final CompletableFuture<Void> stageFuture = new CompletableFuture<>();
- initPolicesCache(reader, stageFuture);
- return stageFuture
- // Read policies in background
- .thenAccept(__ -> readMorePoliciesAsync(reader));
- });
- initFuture.exceptionally(ex -> {
- try {
- log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
- cleanCacheAndCloseReader(namespace, false);
- } catch (Throwable cleanupEx) {
- // Adding this catch to avoid break callback chain
- log.error("[{}] Failed to cleanup reader on __change_events topic", namespace, cleanupEx);
- }
- return null;
- });
- // let caller know we've got an exception.
- return initFuture;
- });
+ return pulsarService.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace)
+ .thenCompose(namespacePolicies -> {
+ if (namespacePolicies.isEmpty() || namespacePolicies.get().deleted) {
+ log.info("[{}] skip prepare init policies cache since the namespace is deleted",
+ namespace);
+ return CompletableFuture.completedFuture(null);
+ }
+
+ return policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
+ final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
+ createSystemTopicClient(namespace);
+ readerCaches.put(namespace, readerCompletableFuture);
+ ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
+ final CompletableFuture<Void> initFuture = readerCompletableFuture
+ .thenCompose(reader -> {
+ final CompletableFuture<Void> stageFuture = new CompletableFuture<>();
+ initPolicesCache(reader, stageFuture);
+ return stageFuture
+ // Read policies in background
+ .thenAccept(__ -> readMorePoliciesAsync(reader));
+ });
+ initFuture.exceptionally(ex -> {
+ try {
+ log.error("[{}] Failed to create reader on __change_events topic",
+ namespace, ex);
+ cleanCacheAndCloseReader(namespace, false);
+ } catch (Throwable cleanupEx) {
+ // Adding this catch to avoid break callback chain
+ log.error("[{}] Failed to cleanup reader on __change_events topic",
+ namespace, cleanupEx);
+ }
+ return null;
+ });
+ // let caller know we've got an exception.
+ return initFuture;
+ });
+ });
}
protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> createSystemTopicClient(
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 343a18da6d8..e571da13435 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -71,6 +71,8 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
private static final String NAMESPACE4 = "system-topic/namespace-4";
+ private static final String NAMESPACE5 = "system-topic/namespace-5";
+
private static final TopicName TOPIC1 = TopicName.get("persistent", NamespaceName.get(NAMESPACE1), "topic-1");
private static final TopicName TOPIC2 = TopicName.get("persistent", NamespaceName.get(NAMESPACE1), "topic-2");
private static final TopicName TOPIC3 = TopicName.get("persistent", NamespaceName.get(NAMESPACE2), "topic-1");
@@ -464,4 +466,21 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
admin.namespaces().deleteNamespace(NAMESPACE4);
Assert.assertNull(service.getWriterCaches().synchronous().getIfPresent(NamespaceName.get(NAMESPACE4)));
}
+
+ @Test
+ public void testPrepareInitPoliciesCacheAsyncWhenNamespaceBeingDeleted() throws Exception {
+ SystemTopicBasedTopicPoliciesService service = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
+ admin.namespaces().createNamespace(NAMESPACE5);
+
+ NamespaceName namespaceName = NamespaceName.get(NAMESPACE5);
+ pulsar.getPulsarResources().getNamespaceResources().setPolicies(namespaceName,
+ old -> {
+ old.deleted = true;
+ return old;
+ });
+
+ assertNull(service.getPoliciesCacheInit(namespaceName));
+ service.prepareInitPoliciesCacheAsync(namespaceName).get();
+ admin.namespaces().deleteNamespace(NAMESPACE5);
+ }
}