You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/09 06:16:39 UTC

[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17562: [fix][broker] Topic policy reader can't recover when get any exception.

mattisonchao commented on code in PR #17562:
URL: https://github.com/apache/pulsar/pull/17562#discussion_r966658816


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -252,20 +253,19 @@ public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle name
         return result;
     }
 
-    private void prepareInitPoliciesCache(NamespaceName namespace, CompletableFuture<Void> result) {
+    private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace, CompletableFuture<Void> result) {
         if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
             CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
                     createSystemTopicClientWithRetry(namespace);
             readerCaches.put(namespace, readerCompletableFuture);
-            readerCompletableFuture.whenComplete((reader, ex) -> {
-                if (ex != null) {
-                    log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
-                    result.completeExceptionally(ex);
-                    cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);

Review Comment:
   In the current time, the reader is null, because we got an exception. So, we can't clean the cache and close the reader.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -252,20 +253,19 @@ public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle name
         return result;
     }
 
-    private void prepareInitPoliciesCache(NamespaceName namespace, CompletableFuture<Void> result) {
+    private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace, CompletableFuture<Void> result) {
         if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
             CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
                     createSystemTopicClientWithRetry(namespace);
             readerCaches.put(namespace, readerCompletableFuture);
-            readerCompletableFuture.whenComplete((reader, ex) -> {
-                if (ex != null) {
-                    log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
-                    result.completeExceptionally(ex);
-                    cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
-                } else {
-                    initPolicesCache(reader, result);
-                    result.thenRun(() -> readMorePolicies(reader));
-                }
+            readerCompletableFuture.thenAccept(reader -> {
+                initPolicesCache(reader, result);
+                result.thenRun(() -> readMorePolicies(reader));
+            }).exceptionally(ex -> {
+                log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
+                cleanCacheAndCloseReader(namespace, false);

Review Comment:
   This method won't have any chance to throw an exception. I think it's safe enough and don't need try-catch.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -367,14 +367,18 @@ private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, Comp
         });
     }
 
-    private void cleanCacheAndCloseReader(NamespaceName namespace, boolean cleanOwnedBundlesCount) {
+    private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean cleanOwnedBundlesCount) {
         CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerFuture = readerCaches.remove(namespace);
         policiesCache.entrySet().removeIf(entry -> Objects.equals(entry.getKey().getNamespaceObject(), namespace));
         if (cleanOwnedBundlesCount) {
             ownedBundlesCountPerNamespace.remove(namespace);
         }
         if (readerFuture != null && !readerFuture.isCompletedExceptionally()) {
-            readerFuture.thenAccept(SystemTopicClient.Reader::closeAsync);
+            readerFuture.thenCompose(SystemTopicClient.Reader::closeAsync)
+                    .exceptionally(ex -> {
+                        log.warn("[{}] Close change_event reader fail.", namespace, ex);

Review Comment:
   We can add a log to help debug. Maybe we can introduce more mechanisms to clean up the resource. But it's not the current PR focus.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -252,20 +253,19 @@ public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle name
         return result;
     }
 
-    private void prepareInitPoliciesCache(NamespaceName namespace, CompletableFuture<Void> result) {
+    private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace, CompletableFuture<Void> result) {
         if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
             CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
                     createSystemTopicClientWithRetry(namespace);
             readerCaches.put(namespace, readerCompletableFuture);
-            readerCompletableFuture.whenComplete((reader, ex) -> {
-                if (ex != null) {
-                    log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
-                    result.completeExceptionally(ex);
-                    cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
-                } else {
-                    initPolicesCache(reader, result);
-                    result.thenRun(() -> readMorePolicies(reader));
-                }
+            readerCompletableFuture.thenAccept(reader -> {

Review Comment:
   Use `thenAccept` to avoid using the null value in `whenComplete` again. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org