You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/11/18 23:40:33 UTC

[pulsar] branch branch-2.8 updated: Remove readerCaches and close reader when exception occurs in SystemTopicBasedTopicPoliciesService. (#12873)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 183734a  Remove readerCaches and close reader when exception occurs in SystemTopicBasedTopicPoliciesService. (#12873)
183734a is described below

commit 183734acb324060184267bc55c27654fc264dd71
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Fri Nov 19 07:35:47 2021 +0800

    Remove readerCaches and close reader when exception occurs in SystemTopicBasedTopicPoliciesService. (#12873)
    
    (cherry picked from commit bcc219b5308379fb354b32eb858b4f54868721d5)
---
 .../pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java     | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index bea4e8e..10f47e6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -227,6 +227,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                 if (ex != null) {
                     log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
                     result.completeExceptionally(ex);
+                    readerCaches.remove(namespace);
+                    reader.closeAsync();
                 } else {
                     initPolicesCache(reader, result);
                     result.thenRun(() -> readMorePolicies(reader));