You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/02/16 07:31:39 UTC
[pulsar] branch branch-2.7 updated: [Branch 2.7] Fix TopicPoliciesCacheNotInitException issue. (#14293)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 0abf26b [Branch 2.7] Fix TopicPoliciesCacheNotInitException issue. (#14293)
0abf26b is described below
commit 0abf26bf91744bf6b84cd56d16cf45d0b639e230
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Wed Feb 16 15:30:02 2022 +0800
[Branch 2.7] Fix TopicPoliciesCacheNotInitException issue. (#14293)
Cherry pick from PR: #12773
### Motivation
Sometimes, we may get TopicPoliciesCacheNotInitException with below stack trace:
```java
15:45:47.020 [pulsar-web-41-3] INFO org.eclipse.jetty.server.RequestLog - 10.0.0.42 - - [10/Nov/2021:15:45:47 +0000] "GET /status.html HTTP/1.1" 200 2 "-" "kube-probe/1.19+" 1
15:45:51.221 [pulsar-2-15] ERROR org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [null] Failed to perform getRetention on topic persistent://public/default/UpdateNodeCharts
java.lang.RuntimeException: org.apache.pulsar.broker.service.BrokerServiceException$TopicPoliciesCacheNotInitException: Topic policies cache have not init.
at org.apache.pulsar.broker.service.TopicPoliciesService.lambda$getTopicPoliciesAsyncWithRetry$0(TopicPoliciesService.java:84) ~[io.streamnative-pulsar-broker-2.8.1.21.jar:2.8.1.21]
at org.apache.pulsar.client.util.RetryUtil.executeWithRetry(RetryUtil.java:50) ~[io.streamnative-pulsar-client-original-2.8.1.21.jar:2.8.1.21]
at org.apache.pulsar.client.util.RetryUtil.lambda$executeWithRetry$1(RetryUtil.java:63) ~[io.streamnative-pulsar-client-original-2.8.1.21.jar:2.8.1.21]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.68.Final.jar:4.1.68.Final]
at java.lang.Thread.run(Thread.java:829) [?:?]
```
This is because when reader.readNextAsync() throws exceptions, the msg will be null which will throw NPE without any catch block.
### Modification
- Close reader when reader.readNextAsync() throw exceptions.
- Remove reducant field policyCacheInitMap . we can use readerCaches instead.
- Add retry logic when getTopicPolicies.
---
.../SystemTopicBasedTopicPoliciesService.java | 46 +++++++++++++++-------
.../SystemTopicBasedTopicPoliciesServiceTest.java | 37 ++++++++++++++++-
2 files changed, 67 insertions(+), 16 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 3dc5a10..0541c7a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -144,6 +144,10 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
@Override
public TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesCacheNotInitException {
+ if (!policyCacheInitMap.containsKey(topicName.getNamespaceObject())) {
+ NamespaceName namespace = topicName.getNamespaceObject();
+ prepareInitPoliciesCache(namespace, new CompletableFuture<>());
+ }
if (policyCacheInitMap.containsKey(topicName.getNamespaceObject())
&& !policyCacheInitMap.get(topicName.getNamespaceObject())) {
throw new TopicPoliciesCacheNotInitException();
@@ -151,6 +155,25 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
return policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
}
+ private void prepareInitPoliciesCache(NamespaceName namespace, CompletableFuture<Void> result) {
+ if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
+ CompletableFuture<SystemTopicClient.Reader> readerCompletableFuture = namespaceEventsSystemTopicFactory
+ .createTopicPoliciesSystemTopicClient(namespace).newReaderAsync();
+ readerCaches.put(namespace, readerCompletableFuture);
+ readerCompletableFuture.whenComplete((reader, ex) -> {
+ if (ex != null) {
+ log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
+ result.completeExceptionally(ex);
+ readerCaches.remove(namespace);
+ reader.closeAsync();
+ } else {
+ initPolicesCache(reader, result);
+ result.thenRun(() -> readMorePolicies(reader));
+ }
+ });
+ }
+ }
+
@Override
public CompletableFuture<TopicPolicies> getTopicPoliciesBypassCacheAsync(TopicName topicName) {
CompletableFuture<TopicPolicies> result = new CompletableFuture<>();
@@ -180,21 +203,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
ownedBundlesCountPerNamespace.get(namespace).incrementAndGet();
result.complete(null);
} else {
- SystemTopicClient systemTopicClient = namespaceEventsSystemTopicFactory.createSystemTopic(namespace
- , EventType.TOPIC_POLICY);
ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
- policyCacheInitMap.put(namespace, false);
- CompletableFuture<SystemTopicClient.Reader> readerCompletableFuture = systemTopicClient.newReaderAsync();
- readerCaches.put(namespace, readerCompletableFuture);
- readerCompletableFuture.whenComplete((reader, ex) -> {
- if (ex != null) {
- log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
- result.completeExceptionally(ex);
- } else {
- initPolicesCache(reader, result);
- result.thenRun(() -> readMorePolicies(reader));
- }
- });
+ prepareInitPoliciesCache(namespace, result);
}
}
return result;
@@ -249,14 +259,20 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
reader.getSystemTopic().getTopicName(), ex);
future.completeExceptionally(ex);
readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
+ policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
+ reader.closeAsync();
+ return;
}
if (hasMore) {
reader.readNextAsync().whenComplete((msg, e) -> {
if (e != null) {
log.error("[{}] Failed to read event from the system topic.",
- reader.getSystemTopic().getTopicName(), ex);
+ reader.getSystemTopic().getTopicName(), e);
future.completeExceptionally(e);
readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
+ policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
+ reader.closeAsync();
+ return;
}
refreshTopicPoliciesCache(msg);
if (log.isDebugEnabled()) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 9eb8669..c806840 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Sets;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
@@ -33,8 +34,13 @@ import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServiceBaseTest {
@@ -193,4 +199,33 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient);
systemTopicBasedTopicPoliciesService = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
}
+
+ @Test
+ public void testGetTopicPoliciesWithRetry() throws Exception {
+ Field initMapField = SystemTopicBasedTopicPoliciesService.class.getDeclaredField("policyCacheInitMap");
+ initMapField.setAccessible(true);
+ Map<NamespaceName, Boolean> initMap = (Map)initMapField.get(systemTopicBasedTopicPoliciesService);
+ initMap.remove(NamespaceName.get(NAMESPACE1));
+ Field readerCaches = SystemTopicBasedTopicPoliciesService.class.getDeclaredField("readerCaches");
+ readerCaches.setAccessible(true);
+ Map<NamespaceName, CompletableFuture<SystemTopicClient.Reader>> readers = (Map)readerCaches.get(systemTopicBasedTopicPoliciesService);
+ readers.remove(NamespaceName.get(NAMESPACE1));
+ TopicPolicies initPolicy = TopicPolicies.builder()
+ .maxConsumerPerTopic(10)
+ .build();
+ ScheduledExecutorService executors = Executors.newScheduledThreadPool(1);
+ executors.schedule(() -> {
+ try {
+ systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, initPolicy).get();
+ } catch (Exception ignore) {}
+ }, 2000, TimeUnit.MILLISECONDS);
+ Awaitility.await().untilAsserted(() -> {
+ try {
+ TopicPolicies topicPolicies = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1);
+ Assert.assertNotNull(topicPolicies);
+ } catch (Exception ex) {
+ Assert.fail();
+ }
+ });
+ }
}