You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/09 06:10:27 UTC

[GitHub] [pulsar] mattisonchao opened a new pull request, #17562: [fix][broker] Topic policy reader can't recover when get any exception.

mattisonchao opened a new pull request, #17562:
URL: https://github.com/apache/pulsar/pull/17562

   ### Motivation
   
   When creating the topic policy reader got any problem(maybe a network problem), and we can not clear the current state, which will let the broker can not retry to init the topic policy reader again until we unload the namespace. 
   
   > Please see the details in the code.
   
   ### Modifications
   
   - Fix the wrong logic and make the method more safety.
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
   
   - [x] `doc-not-needed` 
   (Please explain why)
   
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17562: [fix][broker] Topic policy reader can't recover when get any exception.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17562:
URL: https://github.com/apache/pulsar/pull/17562#discussion_r966658816


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -252,20 +253,19 @@ public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle name
         return result;
     }
 
-    private void prepareInitPoliciesCache(NamespaceName namespace, CompletableFuture<Void> result) {
+    private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace, CompletableFuture<Void> result) {
         if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
             CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
                     createSystemTopicClientWithRetry(namespace);
             readerCaches.put(namespace, readerCompletableFuture);
-            readerCompletableFuture.whenComplete((reader, ex) -> {
-                if (ex != null) {
-                    log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
-                    result.completeExceptionally(ex);
-                    cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);

Review Comment:
   In the current time, the reader is null, because we got an exception. So, we can't clean the cache and close the reader.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -252,20 +253,19 @@ public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle name
         return result;
     }
 
-    private void prepareInitPoliciesCache(NamespaceName namespace, CompletableFuture<Void> result) {
+    private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace, CompletableFuture<Void> result) {
         if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
             CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
                     createSystemTopicClientWithRetry(namespace);
             readerCaches.put(namespace, readerCompletableFuture);
-            readerCompletableFuture.whenComplete((reader, ex) -> {
-                if (ex != null) {
-                    log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
-                    result.completeExceptionally(ex);
-                    cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
-                } else {
-                    initPolicesCache(reader, result);
-                    result.thenRun(() -> readMorePolicies(reader));
-                }
+            readerCompletableFuture.thenAccept(reader -> {
+                initPolicesCache(reader, result);
+                result.thenRun(() -> readMorePolicies(reader));
+            }).exceptionally(ex -> {
+                log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
+                cleanCacheAndCloseReader(namespace, false);

Review Comment:
   This method won't have any chance to throw an exception. I think it's safe enough and don't need try-catch.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -367,14 +367,18 @@ private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, Comp
         });
     }
 
-    private void cleanCacheAndCloseReader(NamespaceName namespace, boolean cleanOwnedBundlesCount) {
+    private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean cleanOwnedBundlesCount) {
         CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerFuture = readerCaches.remove(namespace);
         policiesCache.entrySet().removeIf(entry -> Objects.equals(entry.getKey().getNamespaceObject(), namespace));
         if (cleanOwnedBundlesCount) {
             ownedBundlesCountPerNamespace.remove(namespace);
         }
         if (readerFuture != null && !readerFuture.isCompletedExceptionally()) {
-            readerFuture.thenAccept(SystemTopicClient.Reader::closeAsync);
+            readerFuture.thenCompose(SystemTopicClient.Reader::closeAsync)
+                    .exceptionally(ex -> {
+                        log.warn("[{}] Close change_event reader fail.", namespace, ex);

Review Comment:
   We can add a log to help debug. Maybe we can introduce more mechanisms to clean up the resource. But it's not the current PR focus.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -252,20 +253,19 @@ public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle name
         return result;
     }
 
-    private void prepareInitPoliciesCache(NamespaceName namespace, CompletableFuture<Void> result) {
+    private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace, CompletableFuture<Void> result) {
         if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
             CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
                     createSystemTopicClientWithRetry(namespace);
             readerCaches.put(namespace, readerCompletableFuture);
-            readerCompletableFuture.whenComplete((reader, ex) -> {
-                if (ex != null) {
-                    log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
-                    result.completeExceptionally(ex);
-                    cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
-                } else {
-                    initPolicesCache(reader, result);
-                    result.thenRun(() -> readMorePolicies(reader));
-                }
+            readerCompletableFuture.thenAccept(reader -> {

Review Comment:
   Use `thenAccept` to avoid using the null value in `whenComplete` again. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17562: [fix][broker] Topic policy reader can't recover when get any exception.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17562:
URL: https://github.com/apache/pulsar/pull/17562#discussion_r966658816


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -252,20 +253,19 @@ public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle name
         return result;
     }
 
-    private void prepareInitPoliciesCache(NamespaceName namespace, CompletableFuture<Void> result) {
+    private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace, CompletableFuture<Void> result) {
         if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
             CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
                     createSystemTopicClientWithRetry(namespace);
             readerCaches.put(namespace, readerCompletableFuture);
-            readerCompletableFuture.whenComplete((reader, ex) -> {
-                if (ex != null) {
-                    log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
-                    result.completeExceptionally(ex);
-                    cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);

Review Comment:
   **In the current time reader is null, So we can't clean the cache and close the reader.**



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17562: [fix][broker] Topic policy reader can't recover when get any exception.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17562:
URL: https://github.com/apache/pulsar/pull/17562#discussion_r966658816


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -252,20 +253,19 @@ public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle name
         return result;
     }
 
-    private void prepareInitPoliciesCache(NamespaceName namespace, CompletableFuture<Void> result) {
+    private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace, CompletableFuture<Void> result) {
         if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
             CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
                     createSystemTopicClientWithRetry(namespace);
             readerCaches.put(namespace, readerCompletableFuture);
-            readerCompletableFuture.whenComplete((reader, ex) -> {
-                if (ex != null) {
-                    log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
-                    result.completeExceptionally(ex);
-                    cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);

Review Comment:
   **In the current time, the reader is null, because we got an exception. So, we can't clean the cache and close the reader.**



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17562: [fix][broker] Topic policy reader can't recover when get any exception.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17562:
URL: https://github.com/apache/pulsar/pull/17562#discussion_r966658267


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -252,20 +253,19 @@ public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle name
         return result;
     }
 
-    private void prepareInitPoliciesCache(NamespaceName namespace, CompletableFuture<Void> result) {
+    private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace, CompletableFuture<Void> result) {
         if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
             CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
                     createSystemTopicClientWithRetry(namespace);
             readerCaches.put(namespace, readerCompletableFuture);
-            readerCompletableFuture.whenComplete((reader, ex) -> {
-                if (ex != null) {
-                    log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
-                    result.completeExceptionally(ex);
-                    cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
-                } else {
-                    initPolicesCache(reader, result);
-                    result.thenRun(() -> readMorePolicies(reader));
-                }
+            readerCompletableFuture.thenAccept(reader -> {

Review Comment:
   Use `thenAccept` to avoid using the null value in `whenComplete` again.  because we will pass the future to next operation. So, we don't need `thenCompose`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui merged pull request #17562: [fix][broker] Topic policy reader can't recover when get any exception.

Posted by GitBox <gi...@apache.org>.
codelipenghui merged PR #17562:
URL: https://github.com/apache/pulsar/pull/17562


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17562: [fix][broker] Topic policy reader can't recover when get any exception.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17562:
URL: https://github.com/apache/pulsar/pull/17562#discussion_r966658267


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -252,20 +253,19 @@ public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle name
         return result;
     }
 
-    private void prepareInitPoliciesCache(NamespaceName namespace, CompletableFuture<Void> result) {
+    private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace, CompletableFuture<Void> result) {
         if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
             CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
                     createSystemTopicClientWithRetry(namespace);
             readerCaches.put(namespace, readerCompletableFuture);
-            readerCompletableFuture.whenComplete((reader, ex) -> {
-                if (ex != null) {
-                    log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
-                    result.completeExceptionally(ex);
-                    cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
-                } else {
-                    initPolicesCache(reader, result);
-                    result.thenRun(() -> readMorePolicies(reader));
-                }
+            readerCompletableFuture.thenAccept(reader -> {

Review Comment:
   Use `thenAccept` to avoid using the null value in `whenComplete` again.  because we will pass the `result` future to the next operation. So, we don't need `thenCompose`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] fmiguelez commented on pull request #17562: [fix][broker] Topic policy reader can't recover when get any exception.

Posted by "fmiguelez (via GitHub)" <gi...@apache.org>.
fmiguelez commented on PR #17562:
URL: https://github.com/apache/pulsar/pull/17562#issuecomment-1729163511

   @mattisonchao Do we have any plan to release 2.8.5 shortly?
   This is issue is critical for us and we are currently working with Puslar 2.8.4.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org