You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/08/17 01:36:30 UTC
[pulsar] branch branch-2.10 updated: [fix][broker] Fix get topic policies as null during clean cache (#20763)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 17a4bcf052c [fix][broker] Fix get topic policies as null during clean cache (#20763)
17a4bcf052c is described below
commit 17a4bcf052ca4bbfaae4c17bbcedacda47bcc790
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Wed Jul 12 20:03:50 2023 +0800
[fix][broker] Fix get topic policies as null during clean cache (#20763)
---
.../SystemTopicBasedTopicPoliciesService.java | 32 +++++++---
.../pulsar/broker/admin/TopicPoliciesTest.java | 1 +
.../SystemTopicBasedTopicPoliciesServiceTest.java | 73 ++++++++++++++++++++++
3 files changed, 99 insertions(+), 7 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index c171fea0a6f..419c6d88b22 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
+import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
@@ -220,12 +221,25 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
NamespaceName namespace = topicName.getNamespaceObject();
prepareInitPoliciesCache(namespace, new CompletableFuture<>());
}
- if (policyCacheInitMap.containsKey(topicName.getNamespaceObject())
- && !policyCacheInitMap.get(topicName.getNamespaceObject())) {
- throw new TopicPoliciesCacheNotInitException();
+
+ MutablePair<TopicPoliciesCacheNotInitException, TopicPolicies> result = new MutablePair<>();
+ policyCacheInitMap.compute(topicName.getNamespaceObject(), (k, initialized) -> {
+ if (initialized == null || !initialized) {
+ result.setLeft(new TopicPoliciesCacheNotInitException());
+ } else {
+ TopicPolicies topicPolicies =
+ isGlobal ? globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName()))
+ : policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
+ result.setRight(topicPolicies);
+ }
+ return initialized;
+ });
+
+ if (result.getLeft() != null) {
+ throw result.getLeft();
+ } else {
+ return result.getRight();
}
- return isGlobal ? globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName()))
- : policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
}
@Override
@@ -389,7 +403,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean cleanOwnedBundlesCount) {
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerFuture = readerCaches.remove(namespace);
- policiesCache.entrySet().removeIf(entry -> Objects.equals(entry.getKey().getNamespaceObject(), namespace));
+
if (cleanOwnedBundlesCount) {
ownedBundlesCountPerNamespace.remove(namespace);
}
@@ -400,7 +414,11 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
return null;
});
}
- policyCacheInitMap.remove(namespace);
+
+ policyCacheInitMap.compute(namespace, (k, v) -> {
+ policiesCache.entrySet().removeIf(entry -> Objects.equals(entry.getKey().getNamespaceObject(), namespace));
+ return null;
+ });
}
private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> reader) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 36f27877360..f3e3188de4b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -2953,6 +2953,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
});
}
}
+
@Test
public void testGlobalTopicPolicies() throws Exception {
final String topic = testTopic + UUID.randomUUID();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index e06c256ccb5..c90c4a3a5bc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -33,10 +33,13 @@ import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
@@ -53,13 +56,16 @@ import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.FutureUtil;
+import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker")
+@Slf4j
public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServiceBaseTest {
private static final String NAMESPACE1 = "system-topic/namespace-1";
@@ -385,4 +391,71 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
});
service.deleteTopicPoliciesAsync(TOPIC1).get();
}
+
+ @Test
+ public void testGetTopicPoliciesWithCleanCache() throws Exception {
+ final String topic = "persistent://" + NAMESPACE1 + "/test" + UUID.randomUUID();
+ pulsarClient.newProducer().topic(topic).create().close();
+
+ SystemTopicBasedTopicPoliciesService topicPoliciesService =
+ (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
+
+ ConcurrentHashMap<TopicName, TopicPolicies> spyPoliciesCache = spy(new ConcurrentHashMap<TopicName, TopicPolicies>());
+ FieldUtils.writeDeclaredField(topicPoliciesService, "policiesCache", spyPoliciesCache, true);
+
+ Awaitility.await().untilAsserted(() -> {
+ Assertions.assertThat(topicPoliciesService.getTopicPolicies(TopicName.get(topic))).isNull();
+ });
+
+ admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1);
+ Awaitility.await().untilAsserted(() -> {
+ Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNotNull();
+ });
+
+ Map<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>> readers =
+ (Map<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>)
+ FieldUtils.readDeclaredField(topicPoliciesService, "readerCaches", true);
+
+ Mockito.doAnswer(invocation -> {
+ Thread.sleep(1000);
+ return invocation.callRealMethod();
+ }).when(spyPoliciesCache).get(Mockito.any());
+
+ CompletableFuture<Void> result = new CompletableFuture<>();
+ Thread thread = new Thread(() -> {
+ TopicPolicies topicPolicies;
+ for (int i = 0; i < 10; i++) {
+ try {
+ topicPolicies = topicPoliciesService.getTopicPolicies(TopicName.get(topic));
+ Assert.assertNotNull(topicPolicies);
+ Thread.sleep(500);
+ } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+ log.warn("topic policies cache not init, retry...");
+ } catch (Throwable e) {
+ log.error("ops: ", e);
+ result.completeExceptionally(e);
+ return;
+ }
+ }
+ result.complete(null);
+ });
+
+ Thread thread2 = new Thread(() -> {
+ for (int i = 0; i < 10; i++) {
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
+ readers.get(TopicName.get(topic).getNamespaceObject());
+ if (readerCompletableFuture != null) {
+ readerCompletableFuture.join().closeAsync().join();
+ }
+ }
+ });
+
+ thread.start();
+ thread2.start();
+
+ thread.join();
+ thread2.join();
+
+ result.join();
+ }
}