You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yo...@apache.org on 2021/12/23 14:41:14 UTC

[pulsar] 08/08: Fixes the NPE in system topics policies service (#13469)

This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1a32edc69eb017d1a95f19708a029aab9aeddbf8
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Thu Dec 23 19:27:23 2021 +0800

    Fixes the NPE in system topics policies service (#13469)
    
    ---
    
    *Motivation*
    
    The `namespaceEventsSystemTopicFactory` is created when you will
    use it. But the `createSystemTopicFactoryIfNeeded()` may failed
    which will cause the `namespaceEventsSystemTopicFactory` is null
    and throw a NPE error from the method.
    
    *Modifications*
    
    - throw the error and failed the method when there has exceptions in
    `createSystemTopicFactoryIfNeeded()`
    
    (cherry picked from commit 4022b2884f46bb5e1593da419bb226ad1e0fc768)
---
 .../SystemTopicBasedTopicPoliciesService.java      | 31 +++++++++++++++++-----
 1 file changed, 24 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 1cc71e9..0b305ac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -88,9 +88,13 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
 
     private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, ActionType actionType,
                                                          TopicPolicies policies) {
-        createSystemTopicFactoryIfNeeded();
-
         CompletableFuture<Void> result = new CompletableFuture<>();
+        try {
+            createSystemTopicFactoryIfNeeded();
+        } catch (PulsarServerException e) {
+            result.completeExceptionally(e);
+            return result;
+        }
 
         SystemTopicClient<PulsarEvent> systemTopicClient =
                 namespaceEventsSystemTopicFactory.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
@@ -186,8 +190,9 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
     @Override
     public CompletableFuture<TopicPolicies> getTopicPoliciesBypassCacheAsync(TopicName topicName) {
         CompletableFuture<TopicPolicies> result = new CompletableFuture<>();
-        createSystemTopicFactoryIfNeeded();
-        if (namespaceEventsSystemTopicFactory == null) {
+        try {
+            createSystemTopicFactoryIfNeeded();
+        } catch (PulsarServerException e) {
             result.complete(null);
             return result;
         }
@@ -206,7 +211,6 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
             result.complete(null);
             return result;
         }
-        createSystemTopicFactoryIfNeeded();
         synchronized (this) {
             if (readerCaches.get(namespace) != null) {
                 ownedBundlesCountPerNamespace.get(namespace).incrementAndGet();
@@ -240,9 +244,15 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
 
     protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> creatSystemTopicClientWithRetry(
             NamespaceName namespace) {
+        CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> result = new CompletableFuture<>();
+        try {
+            createSystemTopicFactoryIfNeeded();
+        } catch (PulsarServerException e) {
+            result.completeExceptionally(e);
+            return result;
+        }
         SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
                 .createTopicPoliciesSystemTopicClient(namespace);
-        CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> result = new CompletableFuture<>();
         Backoff backoff = new Backoff(1, TimeUnit.SECONDS, 3, TimeUnit.SECONDS, 10, TimeUnit.SECONDS);
         RetryUtil.retryAsynchronously(() -> {
             try {
@@ -389,6 +399,12 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                     // However, due to compatibility, it is temporarily retained here
                     // and can be deleted in the future.
                     policiesCache.remove(topicName);
+                    try {
+                        createSystemTopicFactoryIfNeeded();
+                    } catch (PulsarServerException e) {
+                        log.error("Failed to create system topic factory");
+                        break;
+                    }
                     SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
                             .createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
                     systemTopicClient.newWriterAsync().thenAccept(writer
@@ -408,7 +424,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
         }
     }
 
-    private void createSystemTopicFactoryIfNeeded() {
+    private void createSystemTopicFactoryIfNeeded() throws PulsarServerException {
         if (namespaceEventsSystemTopicFactory == null) {
             synchronized (this) {
                 if (namespaceEventsSystemTopicFactory == null) {
@@ -417,6 +433,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                                 new NamespaceEventsSystemTopicFactory(pulsarService.getClient());
                     } catch (PulsarServerException e) {
                         log.error("Create namespace event system topic factory error.", e);
+                        throw e;
                     }
                 }
             }