You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yo...@apache.org on 2021/12/23 14:41:14 UTC
[pulsar] 08/08: Fixes the NPE in system topics policies service (#13469)
This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1a32edc69eb017d1a95f19708a029aab9aeddbf8
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Thu Dec 23 19:27:23 2021 +0800
Fixes the NPE in system topics policies service (#13469)
---
*Motivation*
The `namespaceEventsSystemTopicFactory` is created when you will
use it. But the `createSystemTopicFactoryIfNeeded()` may failed
which will cause the `namespaceEventsSystemTopicFactory` is null
and throw a NPE error from the method.
*Modifications*
- throw the error and failed the method when there has exceptions in
`createSystemTopicFactoryIfNeeded()`
(cherry picked from commit 4022b2884f46bb5e1593da419bb226ad1e0fc768)
---
.../SystemTopicBasedTopicPoliciesService.java | 31 +++++++++++++++++-----
1 file changed, 24 insertions(+), 7 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 1cc71e9..0b305ac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -88,9 +88,13 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, ActionType actionType,
TopicPolicies policies) {
- createSystemTopicFactoryIfNeeded();
-
CompletableFuture<Void> result = new CompletableFuture<>();
+ try {
+ createSystemTopicFactoryIfNeeded();
+ } catch (PulsarServerException e) {
+ result.completeExceptionally(e);
+ return result;
+ }
SystemTopicClient<PulsarEvent> systemTopicClient =
namespaceEventsSystemTopicFactory.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
@@ -186,8 +190,9 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
@Override
public CompletableFuture<TopicPolicies> getTopicPoliciesBypassCacheAsync(TopicName topicName) {
CompletableFuture<TopicPolicies> result = new CompletableFuture<>();
- createSystemTopicFactoryIfNeeded();
- if (namespaceEventsSystemTopicFactory == null) {
+ try {
+ createSystemTopicFactoryIfNeeded();
+ } catch (PulsarServerException e) {
result.complete(null);
return result;
}
@@ -206,7 +211,6 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
result.complete(null);
return result;
}
- createSystemTopicFactoryIfNeeded();
synchronized (this) {
if (readerCaches.get(namespace) != null) {
ownedBundlesCountPerNamespace.get(namespace).incrementAndGet();
@@ -240,9 +244,15 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> creatSystemTopicClientWithRetry(
NamespaceName namespace) {
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> result = new CompletableFuture<>();
+ try {
+ createSystemTopicFactoryIfNeeded();
+ } catch (PulsarServerException e) {
+ result.completeExceptionally(e);
+ return result;
+ }
SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
.createTopicPoliciesSystemTopicClient(namespace);
- CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> result = new CompletableFuture<>();
Backoff backoff = new Backoff(1, TimeUnit.SECONDS, 3, TimeUnit.SECONDS, 10, TimeUnit.SECONDS);
RetryUtil.retryAsynchronously(() -> {
try {
@@ -389,6 +399,12 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
// However, due to compatibility, it is temporarily retained here
// and can be deleted in the future.
policiesCache.remove(topicName);
+ try {
+ createSystemTopicFactoryIfNeeded();
+ } catch (PulsarServerException e) {
+ log.error("Failed to create system topic factory");
+ break;
+ }
SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
systemTopicClient.newWriterAsync().thenAccept(writer
@@ -408,7 +424,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
}
}
- private void createSystemTopicFactoryIfNeeded() {
+ private void createSystemTopicFactoryIfNeeded() throws PulsarServerException {
if (namespaceEventsSystemTopicFactory == null) {
synchronized (this) {
if (namespaceEventsSystemTopicFactory == null) {
@@ -417,6 +433,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
new NamespaceEventsSystemTopicFactory(pulsarService.getClient());
} catch (PulsarServerException e) {
log.error("Create namespace event system topic factory error.", e);
+ throw e;
}
}
}