You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/08/17 01:36:30 UTC

[pulsar] branch branch-2.10 updated: [fix][broker] Fix get topic policies as null during clean cache (#20763)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 17a4bcf052c [fix][broker] Fix get topic policies as null during clean cache (#20763)
17a4bcf052c is described below

commit 17a4bcf052ca4bbfaae4c17bbcedacda47bcc790
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Wed Jul 12 20:03:50 2023 +0800

    [fix][broker] Fix get topic policies as null during clean cache (#20763)
---
 .../SystemTopicBasedTopicPoliciesService.java      | 32 +++++++---
 .../pulsar/broker/admin/TopicPoliciesTest.java     |  1 +
 .../SystemTopicBasedTopicPoliciesServiceTest.java  | 73 ++++++++++++++++++++++
 3 files changed, 99 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index c171fea0a6f..419c6d88b22 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nonnull;
+import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
@@ -220,12 +221,25 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
             NamespaceName namespace = topicName.getNamespaceObject();
             prepareInitPoliciesCache(namespace, new CompletableFuture<>());
         }
-        if (policyCacheInitMap.containsKey(topicName.getNamespaceObject())
-                && !policyCacheInitMap.get(topicName.getNamespaceObject())) {
-            throw new TopicPoliciesCacheNotInitException();
+
+        MutablePair<TopicPoliciesCacheNotInitException, TopicPolicies> result = new MutablePair<>();
+        policyCacheInitMap.compute(topicName.getNamespaceObject(), (k, initialized) -> {
+            if (initialized == null || !initialized) {
+                result.setLeft(new TopicPoliciesCacheNotInitException());
+            } else {
+                TopicPolicies topicPolicies =
+                        isGlobal ? globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName()))
+                                : policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
+                result.setRight(topicPolicies);
+            }
+            return initialized;
+        });
+
+        if (result.getLeft() != null) {
+            throw result.getLeft();
+        } else {
+            return result.getRight();
         }
-        return isGlobal ? globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName()))
-                : policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
     }
 
     @Override
@@ -389,7 +403,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
 
     private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean cleanOwnedBundlesCount) {
         CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerFuture = readerCaches.remove(namespace);
-        policiesCache.entrySet().removeIf(entry -> Objects.equals(entry.getKey().getNamespaceObject(), namespace));
+
         if (cleanOwnedBundlesCount) {
             ownedBundlesCountPerNamespace.remove(namespace);
         }
@@ -400,7 +414,11 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                         return null;
                     });
         }
-        policyCacheInitMap.remove(namespace);
+
+        policyCacheInitMap.compute(namespace, (k, v) -> {
+            policiesCache.entrySet().removeIf(entry -> Objects.equals(entry.getKey().getNamespaceObject(), namespace));
+            return null;
+        });
     }
 
     private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> reader) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 36f27877360..f3e3188de4b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -2953,6 +2953,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
             });
         }
     }
+
     @Test
     public void testGlobalTopicPolicies() throws Exception {
         final String topic = testTopic + UUID.randomUUID();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index e06c256ccb5..c90c4a3a5bc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -33,10 +33,13 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
@@ -53,13 +56,16 @@ import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.assertj.core.api.Assertions;
 import org.awaitility.Awaitility;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
+@Slf4j
 public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServiceBaseTest {
 
     private static final String NAMESPACE1 = "system-topic/namespace-1";
@@ -385,4 +391,71 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
         });
         service.deleteTopicPoliciesAsync(TOPIC1).get();
     }
+
+    @Test
+    public void testGetTopicPoliciesWithCleanCache() throws Exception {
+        final String topic = "persistent://" + NAMESPACE1 + "/test" + UUID.randomUUID();
+        pulsarClient.newProducer().topic(topic).create().close();
+
+        SystemTopicBasedTopicPoliciesService topicPoliciesService =
+                (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
+
+        ConcurrentHashMap<TopicName, TopicPolicies> spyPoliciesCache = spy(new ConcurrentHashMap<TopicName, TopicPolicies>());
+        FieldUtils.writeDeclaredField(topicPoliciesService, "policiesCache", spyPoliciesCache, true);
+
+            Awaitility.await().untilAsserted(() -> {
+                Assertions.assertThat(topicPoliciesService.getTopicPolicies(TopicName.get(topic))).isNull();
+        });
+
+        admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1);
+        Awaitility.await().untilAsserted(() -> {
+                Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNotNull();
+            });
+
+        Map<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>> readers =
+                (Map<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>)
+                        FieldUtils.readDeclaredField(topicPoliciesService, "readerCaches", true);
+
+        Mockito.doAnswer(invocation -> {
+            Thread.sleep(1000);
+            return invocation.callRealMethod();
+        }).when(spyPoliciesCache).get(Mockito.any());
+
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        Thread thread = new Thread(() -> {
+            TopicPolicies topicPolicies;
+            for (int i = 0; i < 10; i++) {
+                try {
+                    topicPolicies = topicPoliciesService.getTopicPolicies(TopicName.get(topic));
+                    Assert.assertNotNull(topicPolicies);
+                    Thread.sleep(500);
+                } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+                    log.warn("topic policies cache not init, retry...");
+                } catch (Throwable e) {
+                    log.error("ops: ", e);
+                    result.completeExceptionally(e);
+                    return;
+                }
+            }
+            result.complete(null);
+        });
+
+        Thread thread2 = new Thread(() -> {
+            for (int i = 0; i < 10; i++) {
+                CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
+                        readers.get(TopicName.get(topic).getNamespaceObject());
+                if (readerCompletableFuture != null) {
+                    readerCompletableFuture.join().closeAsync().join();
+                }
+            }
+        });
+
+        thread.start();
+        thread2.start();
+
+        thread.join();
+        thread2.join();
+
+        result.join();
+    }
 }