You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/09/13 01:44:12 UTC
[pulsar] branch branch-2.9 updated: [fix][broker] Topic policy reader can't recover when get any exception (#17562)
This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new db8480b66f0 [fix][broker] Topic policy reader can't recover when get any exception (#17562)
db8480b66f0 is described below
commit db8480b66f0634193e85de01358a1e34648d5595
Author: Qiang Zhao <ma...@apache.org>
AuthorDate: Tue Sep 13 09:06:25 2022 +0800
[fix][broker] Topic policy reader can't recover when get any exception (#17562)
(cherry picked from commit 5aa1f1101c6fef61163b34558cc98bf362dfa969)
---
.../SystemTopicBasedTopicPoliciesService.java | 28 ++++++++++++----------
1 file changed, 16 insertions(+), 12 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index a5bcaa5728d..38ec5d803c6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nonnull;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
@@ -230,21 +231,20 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
return result;
}
- private void prepareInitPoliciesCache(NamespaceName namespace, CompletableFuture<Void> result) {
+ private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace, CompletableFuture<Void> result) {
if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
createSystemTopicClientWithRetry(namespace);
readerCaches.put(namespace, readerCompletableFuture);
ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
- readerCompletableFuture.whenComplete((reader, ex) -> {
- if (ex != null) {
- log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
- result.completeExceptionally(ex);
- cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
- } else {
- initPolicesCache(reader, result);
- result.thenRun(() -> readMorePolicies(reader));
- }
+ readerCompletableFuture.thenAccept(reader -> {
+ initPolicesCache(reader, result);
+ result.thenRun(() -> readMorePolicies(reader));
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
+ cleanCacheAndCloseReader(namespace, false);
+ result.completeExceptionally(ex);
+ return null;
});
}
}
@@ -346,14 +346,18 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
});
}
- private void cleanCacheAndCloseReader(NamespaceName namespace, boolean cleanOwnedBundlesCount) {
+ private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean cleanOwnedBundlesCount) {
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerFuture = readerCaches.remove(namespace);
policiesCache.entrySet().removeIf(entry -> Objects.equals(entry.getKey().getNamespaceObject(), namespace));
if (cleanOwnedBundlesCount) {
ownedBundlesCountPerNamespace.remove(namespace);
}
if (readerFuture != null && !readerFuture.isCompletedExceptionally()) {
- readerFuture.thenAccept(SystemTopicClient.Reader::closeAsync);
+ readerFuture.thenCompose(SystemTopicClient.Reader::closeAsync)
+ .exceptionally(ex -> {
+ log.warn("[{}] Close change_event reader fail.", namespace, ex);
+ return null;
+ });
}
policyCacheInitMap.remove(namespace);
}