You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/13 07:20:58 UTC
[pulsar] branch branch-2.11 updated: [fix][broker] Topic policy reader can't recover when get any exception (#17562)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new c4bdf2ce1af [fix][broker] Topic policy reader can't recover when get any exception (#17562)
c4bdf2ce1af is described below
commit c4bdf2ce1af1512265952643710948684b7bb9b6
Author: Qiang Zhao <ma...@apache.org>
AuthorDate: Tue Sep 13 09:06:25 2022 +0800
[fix][broker] Topic policy reader can't recover when get any exception (#17562)
---
.../SystemTopicBasedTopicPoliciesService.java | 28 ++++++++++++----------
1 file changed, 16 insertions(+), 12 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 93f97bbce07..0e0e4950c36 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -29,6 +29,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nonnull;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
@@ -252,20 +253,19 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
return result;
}
- private void prepareInitPoliciesCache(NamespaceName namespace, CompletableFuture<Void> result) {
+ private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace, CompletableFuture<Void> result) {
if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
createSystemTopicClientWithRetry(namespace);
readerCaches.put(namespace, readerCompletableFuture);
- readerCompletableFuture.whenComplete((reader, ex) -> {
- if (ex != null) {
- log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
- result.completeExceptionally(ex);
- cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
- } else {
- initPolicesCache(reader, result);
- result.thenRun(() -> readMorePolicies(reader));
- }
+ readerCompletableFuture.thenAccept(reader -> {
+ initPolicesCache(reader, result);
+ result.thenRun(() -> readMorePolicies(reader));
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
+ cleanCacheAndCloseReader(namespace, false);
+ result.completeExceptionally(ex);
+ return null;
});
}
}
@@ -367,14 +367,18 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
});
}
- private void cleanCacheAndCloseReader(NamespaceName namespace, boolean cleanOwnedBundlesCount) {
+ private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean cleanOwnedBundlesCount) {
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerFuture = readerCaches.remove(namespace);
policiesCache.entrySet().removeIf(entry -> Objects.equals(entry.getKey().getNamespaceObject(), namespace));
if (cleanOwnedBundlesCount) {
ownedBundlesCountPerNamespace.remove(namespace);
}
if (readerFuture != null && !readerFuture.isCompletedExceptionally()) {
- readerFuture.thenAccept(SystemTopicClient.Reader::closeAsync);
+ readerFuture.thenCompose(SystemTopicClient.Reader::closeAsync)
+ .exceptionally(ex -> {
+ log.warn("[{}] Close change_event reader fail.", namespace, ex);
+ return null;
+ });
}
policyCacheInitMap.remove(namespace);
}