You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/02/16 07:31:39 UTC

[pulsar] branch branch-2.7 updated: [Branch 2.7] Fix TopicPoliciesCacheNotInitException issue. (#14293)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 0abf26b  [Branch 2.7] Fix TopicPoliciesCacheNotInitException issue. (#14293)
0abf26b is described below

commit 0abf26bf91744bf6b84cd56d16cf45d0b639e230
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Wed Feb 16 15:30:02 2022 +0800

    [Branch 2.7] Fix TopicPoliciesCacheNotInitException issue. (#14293)
    
    Cherry pick from PR: #12773
    
    ### Motivation
    
    Sometimes, we may get TopicPoliciesCacheNotInitException with below stack trace:
    
    ```java
    15:45:47.020 [pulsar-web-41-3] INFO  org.eclipse.jetty.server.RequestLog - 10.0.0.42 - - [10/Nov/2021:15:45:47 +0000] "GET /status.html HTTP/1.1" 200 2 "-" "kube-probe/1.19+" 1
    15:45:51.221 [pulsar-2-15] ERROR org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [null] Failed to perform getRetention on topic persistent://public/default/UpdateNodeCharts
    java.lang.RuntimeException: org.apache.pulsar.broker.service.BrokerServiceException$TopicPoliciesCacheNotInitException: Topic policies cache have not init.
    	at org.apache.pulsar.broker.service.TopicPoliciesService.lambda$getTopicPoliciesAsyncWithRetry$0(TopicPoliciesService.java:84) ~[io.streamnative-pulsar-broker-2.8.1.21.jar:2.8.1.21]
    	at org.apache.pulsar.client.util.RetryUtil.executeWithRetry(RetryUtil.java:50) ~[io.streamnative-pulsar-client-original-2.8.1.21.jar:2.8.1.21]
    	at org.apache.pulsar.client.util.RetryUtil.lambda$executeWithRetry$1(RetryUtil.java:63) ~[io.streamnative-pulsar-client-original-2.8.1.21.jar:2.8.1.21]
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
    	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
    	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.68.Final.jar:4.1.68.Final]
    	at java.lang.Thread.run(Thread.java:829) [?:?]
    ```
    
    This is because when reader.readNextAsync() throws exceptions, the msg will be null which will throw NPE without any catch block.
    
    ### Modification
    
    - Close reader when reader.readNextAsync() throw exceptions.
    - Remove reducant field policyCacheInitMap . we can use readerCaches instead.
    - Add retry logic when getTopicPolicies.
---
 .../SystemTopicBasedTopicPoliciesService.java      | 46 +++++++++++++++-------
 .../SystemTopicBasedTopicPoliciesServiceTest.java  | 37 ++++++++++++++++-
 2 files changed, 67 insertions(+), 16 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 3dc5a10..0541c7a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -144,6 +144,10 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
 
     @Override
     public TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesCacheNotInitException {
+        if (!policyCacheInitMap.containsKey(topicName.getNamespaceObject())) {
+            NamespaceName namespace = topicName.getNamespaceObject();
+            prepareInitPoliciesCache(namespace, new CompletableFuture<>());
+        }
         if (policyCacheInitMap.containsKey(topicName.getNamespaceObject())
                 && !policyCacheInitMap.get(topicName.getNamespaceObject())) {
             throw new TopicPoliciesCacheNotInitException();
@@ -151,6 +155,25 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
         return policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
     }
 
+    private void prepareInitPoliciesCache(NamespaceName namespace, CompletableFuture<Void> result) {
+        if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
+            CompletableFuture<SystemTopicClient.Reader> readerCompletableFuture = namespaceEventsSystemTopicFactory
+                    .createTopicPoliciesSystemTopicClient(namespace).newReaderAsync();
+            readerCaches.put(namespace, readerCompletableFuture);
+            readerCompletableFuture.whenComplete((reader, ex) -> {
+                if (ex != null) {
+                    log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
+                    result.completeExceptionally(ex);
+                    readerCaches.remove(namespace);
+                    reader.closeAsync();
+                } else {
+                    initPolicesCache(reader, result);
+                    result.thenRun(() -> readMorePolicies(reader));
+                }
+            });
+        }
+    }
+
     @Override
     public CompletableFuture<TopicPolicies> getTopicPoliciesBypassCacheAsync(TopicName topicName) {
         CompletableFuture<TopicPolicies> result = new CompletableFuture<>();
@@ -180,21 +203,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                 ownedBundlesCountPerNamespace.get(namespace).incrementAndGet();
                 result.complete(null);
             } else {
-                SystemTopicClient systemTopicClient = namespaceEventsSystemTopicFactory.createSystemTopic(namespace
-                        , EventType.TOPIC_POLICY);
                 ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
-                policyCacheInitMap.put(namespace, false);
-                CompletableFuture<SystemTopicClient.Reader> readerCompletableFuture = systemTopicClient.newReaderAsync();
-                readerCaches.put(namespace, readerCompletableFuture);
-                readerCompletableFuture.whenComplete((reader, ex) -> {
-                    if (ex != null) {
-                        log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
-                        result.completeExceptionally(ex);
-                    } else {
-                        initPolicesCache(reader, result);
-                        result.thenRun(() -> readMorePolicies(reader));
-                    }
-                });
+                prepareInitPoliciesCache(namespace, result);
             }
         }
         return result;
@@ -249,14 +259,20 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                         reader.getSystemTopic().getTopicName(), ex);
                 future.completeExceptionally(ex);
                 readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
+                policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
+                reader.closeAsync();
+                return;
             }
             if (hasMore) {
                 reader.readNextAsync().whenComplete((msg, e) -> {
                     if (e != null) {
                         log.error("[{}] Failed to read event from the system topic.",
-                                reader.getSystemTopic().getTopicName(), ex);
+                                reader.getSystemTopic().getTopicName(), e);
                         future.completeExceptionally(e);
                         readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
+                        policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
+                        reader.closeAsync();
+                        return;
                     }
                     refreshTopicPoliciesCache(msg);
                     if (log.isDebugEnabled()) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 9eb8669..c806840 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Sets;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
@@ -33,8 +34,13 @@ import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServiceBaseTest {
 
@@ -193,4 +199,33 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
         systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient);
         systemTopicBasedTopicPoliciesService = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
     }
+
+    @Test
+    public void testGetTopicPoliciesWithRetry() throws Exception {
+        Field initMapField = SystemTopicBasedTopicPoliciesService.class.getDeclaredField("policyCacheInitMap");
+        initMapField.setAccessible(true);
+        Map<NamespaceName, Boolean> initMap = (Map)initMapField.get(systemTopicBasedTopicPoliciesService);
+        initMap.remove(NamespaceName.get(NAMESPACE1));
+        Field readerCaches = SystemTopicBasedTopicPoliciesService.class.getDeclaredField("readerCaches");
+        readerCaches.setAccessible(true);
+        Map<NamespaceName, CompletableFuture<SystemTopicClient.Reader>> readers = (Map)readerCaches.get(systemTopicBasedTopicPoliciesService);
+        readers.remove(NamespaceName.get(NAMESPACE1));
+        TopicPolicies initPolicy = TopicPolicies.builder()
+                .maxConsumerPerTopic(10)
+                .build();
+        ScheduledExecutorService executors = Executors.newScheduledThreadPool(1);
+        executors.schedule(() -> {
+            try {
+                systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, initPolicy).get();
+            } catch (Exception ignore) {}
+        }, 2000, TimeUnit.MILLISECONDS);
+        Awaitility.await().untilAsserted(() -> {
+            try {
+                TopicPolicies topicPolicies = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1);
+                Assert.assertNotNull(topicPolicies);
+            } catch (Exception ex) {
+                Assert.fail();
+            }
+        });
+    }
 }