You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/27 16:25:45 UTC

[GitHub] [pulsar] poorbarcode opened a new pull request, #16247: [fix] [broker] Repeat create same topic

poorbarcode opened a new pull request, #16247:
URL: https://github.com/apache/pulsar/pull/16247

   ### Motivation
   E.g. `TB.recover`, `topic.unload`, `lookup` executed at the same time:
   
   | Time | `TB.recover` | `topic.unload` | `lookup-a` | `lookup-b` |
   | ----------- | ----------- | ----------- | ----------- | ----------- |
   | 1 | TB recover |  |  |  |
   | 2 | TB recover failure | topic.unload |  |  |
   | 3 | topic.close | topic.close |  |  |
   | 4 | brokerService.topics.remove(topicName) |  |  |  |
   | 5 | remove `topic` |  | lookup |  |
   | 6 |  |  | create `topic'` |  |
   | 7 |  | brokerService.topics.remove(topicName) |  |  |
   | 8 |  | remove `topic'` |  | lookup |
   | 9 |  |  |  | create `topic''`  |
   
   In step 8: 
   - `topic'` removed from `broker-service-cache-mapping`, but it still exists 
   
   After step 9:
   - we have two same topics: [`topic'`, `topic''`]
   - `topic''` mapping in  `broker-service-cache-mapping`
   - `topic'` is not mapping in  `broker-service-cache-mapping`, but it may be used by the client.
   
   We should remove “The specify Topic Refer” to prevent errors from concurrent operations.
   
   ### Modifications
   
   use `remove(key, value)` instead `remove(key)`.
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [ ] `doc-required` 
   (Your PR needs to update docs, and you will update later)
     
   - [x] `doc-not-needed` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #16247: [fix] [broker] The broker has two identical Persitenttopics

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #16247:
URL: https://github.com/apache/pulsar/pull/16247#issuecomment-1170101266

   Hi @codelipenghui 
   
   > We also should consider to avoid a topic can be closed more than once.
   
   I have appended the reason why `topic.close` was executed twice to the Motivation, and in this PR I've overwritten the `topic.close` to fix it. I also added a lock to the 'reset topic stat to UN-fenced' operation, could you review the code.
   
    > We can use map.compute() to simplify the logic? 
   
   Unfortunately, we can't use `map.compute` tto simplify the logic.
   
   > And looks like we don't need to wait for the future complete, because we already have the topic reference here.
    Yes, I have fixed it. The current implementation doesn't need to wait for the future to complete.
   
   I also rewritten the Motivation of this PR to make it easier to understand. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #16247: [fix] [broker] The broker has two identical Persitenttopics

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #16247:
URL: https://github.com/apache/pulsar/pull/16247#issuecomment-1218205977

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode closed pull request #16247: [fix] [broker] The broker has two identical Persitenttopics

Posted by GitBox <gi...@apache.org>.
poorbarcode closed pull request #16247: [fix] [broker] The broker has two identical Persitenttopics
URL: https://github.com/apache/pulsar/pull/16247


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16247: [fix] [broker] The broker has two identical Persitenttopics

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16247:
URL: https://github.com/apache/pulsar/pull/16247#discussion_r908205993


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1918,15 +1918,38 @@ public AuthorizationService getAuthorizationService() {
         return authorizationService;
     }
 
-    public CompletableFuture<Void> removeTopicFromCache(String topic) {
+    public CompletableFuture<Void> removeTopicFromCache(String topicNameString, Topic topic) {
+        if (topic == null){
+            return removeTopicFromCache(topicNameString);
+        }
+        final CompletableFuture<Optional<Topic>> createTopicFuture = getTopic(topicNameString, false);
+        return createTopicFuture.thenCompose(optionalTopic -> {
+            if (optionalTopic.isPresent() && optionalTopic.get() == topic){
+                return removeTopicFromCache(topicNameString, createTopicFuture);
+            }
+            // If topic is not in Cache, do nothing.
+            return CompletableFuture.completedFuture(null);
+        });

Review Comment:
   We can use `map.compute()` to simplify the logic? And looks like we don't need to wait for the future complete, because we already have the topic reference here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16247: [fix] [broker] The broker has two identical Persitenttopics

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16247:
URL: https://github.com/apache/pulsar/pull/16247#discussion_r958321455


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1243,86 +1247,145 @@ public CompletableFuture<Void> close() {
     }
 
     /**
-     * Close this topic - close all producers and subscriptions associated with this topic.
-     *
+     * Close this topic - close all resources associated with this topic:
+     *   1.clients (producers and consumers) {@link #asyncCloseClients()}.
+     *   2.managed ledger {@link #asyncCloseLedger()}.
+     *   3.limiters {@link #closeLimiters()}.
      * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
      * @return Completable future indicating completion of close operation
      */
     @Override
     public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-
         lock.writeLock().lock();
         try {
             // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
             // forcefully wants to close managed-ledger without waiting all resources to be closed.
-            if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
-                fenceTopicToCloseOrDelete();
+            if (isClosingOrDeleting){
+                if (closeWithoutWaitingClientDisconnect){
+                    return this.fullyCloseFuture;
+                } else {
+                    // Why not return this.fullyCloseFuture ?
+                    // Just keep the same implementation as before.
+                    log.warn("[{}] Topic is already being closed or deleted", topic);
+                    return CompletableFuture.failedFuture(new TopicFencedException("Topic is already fenced"));
+                }
             } else {
-                log.warn("[{}] Topic is already being closed or deleted", topic);
-                closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
-                return closeFuture;
+                fenceTopicToCloseOrDelete();
+                this.fullyCloseFuture = new CompletableFuture<>();
             }
         } finally {
             lock.writeLock().unlock();
         }
+        // Declare result.
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        // Close limiters.
+        try {
+            closeLimiters();
+        } catch (Exception t){
+            log.error("[{}] Error closing topic", topic, t);
+            unfenceTopicToResumeWithLock();
+            this.fullyCloseFuture.completeExceptionally(t);
+            return CompletableFuture.failedFuture(t);
+        }
+
+        // Close client components.
+        CompletableFuture<Void> closeClientsFuture = asyncCloseClients();

Review Comment:
   The code in line:1294~line:1298 handles the logic `closeWithoutWaitingClientDisconnect`: 
   
   ```java
   CompletableFuture<Void> closePhase2Future;
   if (closeWithoutWaitingClientDisconnect){
       closePhase2Future = asyncCloseLedger();
   } else {
       closePhase2Future = closeClientsFuture.thenCompose(__ -> asyncCloseLedger());
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode closed pull request #16247: [fix] [broker] The broker has two identical Persitenttopics

Posted by GitBox <gi...@apache.org>.
poorbarcode closed pull request #16247: [fix] [broker] The broker has two identical Persitenttopics
URL: https://github.com/apache/pulsar/pull/16247


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on pull request #16247: [fix] [broker] The broker has two identical Persitenttopics

Posted by GitBox <gi...@apache.org>.
Jason918 commented on PR #16247:
URL: https://github.com/apache/pulsar/pull/16247#issuecomment-1229151451

   @codelipenghui @eolivelli @lhotari PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16247: [fix] [broker] The broker has two identical Persitenttopics

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16247:
URL: https://github.com/apache/pulsar/pull/16247#discussion_r958323439


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1953,15 +1953,50 @@ public AuthorizationService getAuthorizationService() {
         return authorizationService;
     }
 
-    public CompletableFuture<Void> removeTopicFromCache(String topic) {
+    public CompletableFuture<Void> removeTopicFromCache(String topicNameString, Topic topic) {
+        if (topic == null){
+            return removeTopicFromCache(topicNameString);
+        }
+        final CompletableFuture<Optional<Topic>> createTopicFuture = topics.get(topicNameString);
+        // If not exists in cache, do nothing.
+        if (createTopicFuture == null){
+            return CompletableFuture.completedFuture(null);
+        }
+        // We don't need to wait for the future complete, because we already have the topic reference here.
+        if (!createTopicFuture.isDone()){
+            return CompletableFuture.completedFuture(null);
+        }
+        return createTopicFuture.thenCompose(topicOptional -> {
+            Topic topicInCache = topicOptional.orElse(null);
+            // If @param topic is not equals with cached, do nothing.
+            if (topicInCache == null || topicInCache != topic){
+                return CompletableFuture.completedFuture(null);
+            } else {
+                // Do remove.
+                return removeTopicFromCache(topicNameString, createTopicFuture);
+            }
+        });
+    }
+
+    public CompletableFuture<Void> removeTopicFromCache(String topic){
+        return removeTopicFromCache(topic, (CompletableFuture) null);

Review Comment:
   Already fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #16247: [fix] [broker] Repeat creates same topic

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #16247:
URL: https://github.com/apache/pulsar/pull/16247#issuecomment-1168166411

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #16247: [fix] [broker] The broker has two identical Persitenttopics

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #16247:
URL: https://github.com/apache/pulsar/pull/16247#issuecomment-1177192642

   @merlimat Could you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16247: [fix] [broker] The broker has two identical Persitenttopics

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16247:
URL: https://github.com/apache/pulsar/pull/16247#discussion_r948146674


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1953,15 +1953,46 @@ public AuthorizationService getAuthorizationService() {
         return authorizationService;
     }
 
-    public CompletableFuture<Void> removeTopicFromCache(String topic) {
+    public CompletableFuture<Void> removeTopicFromCache(String topicNameString, Topic topic) {
+        if (topic == null){
+            return removeTopicFromCache(topicNameString);
+        }
+        final CompletableFuture<Optional<Topic>> createTopicFuture = topics.get(topicNameString);
+        // If not exists in cache, do nothing.
+        if (createTopicFuture == null){
+            return CompletableFuture.completedFuture(null);
+        }
+        // We don't need to wait for the future complete, because we already have the topic reference here.
+        if (!createTopicFuture.isDone()){

Review Comment:
   I think that we should wait for this future to complete, otherwise it may create another Topic



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1243,86 +1247,146 @@ public CompletableFuture<Void> close() {
     }
 
     /**
-     * Close this topic - close all producers and subscriptions associated with this topic.
-     *
+     * Close this topic - close all resources associated with this topic:
+     *   1.clients (producers and consumers) {@link #asyncCloseClients()}.
+     *   2.managed ledger {@link #asyncCloseLedger()}.
+     *   3.limiters {@link #closeLimiters()}.
      * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
      * @return Completable future indicating completion of close operation
      */
     @Override
     public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-
         lock.writeLock().lock();
         try {
             // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
             // forcefully wants to close managed-ledger without waiting all resources to be closed.
-            if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
-                fenceTopicToCloseOrDelete();
+            if (isClosingOrDeleting){
+                if (closeWithoutWaitingClientDisconnect){
+                    return this.fullyCloseFuture;
+                } else {
+                    // Why not return this.fullyCloseFuture ?
+                    // I don't know, just keep the same implementation as before.

Review Comment:
   please remove this kind of comments "I don't know"



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1243,86 +1247,146 @@ public CompletableFuture<Void> close() {
     }
 
     /**
-     * Close this topic - close all producers and subscriptions associated with this topic.
-     *
+     * Close this topic - close all resources associated with this topic:
+     *   1.clients (producers and consumers) {@link #asyncCloseClients()}.
+     *   2.managed ledger {@link #asyncCloseLedger()}.
+     *   3.limiters {@link #closeLimiters()}.
      * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
      * @return Completable future indicating completion of close operation
      */
     @Override
     public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-
         lock.writeLock().lock();
         try {
             // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
             // forcefully wants to close managed-ledger without waiting all resources to be closed.
-            if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
-                fenceTopicToCloseOrDelete();
+            if (isClosingOrDeleting){
+                if (closeWithoutWaitingClientDisconnect){
+                    return this.fullyCloseFuture;
+                } else {
+                    // Why not return this.fullyCloseFuture ?
+                    // I don't know, just keep the same implementation as before.
+                    log.warn("[{}] Topic is already being closed or deleted", topic);
+                    return CompletableFuture.failedFuture(new TopicFencedException("Topic is already fenced"));
+                }
             } else {
-                log.warn("[{}] Topic is already being closed or deleted", topic);
-                closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
-                return closeFuture;
+                fenceTopicToCloseOrDelete();
+                this.fullyCloseFuture = new CompletableFuture<>();
             }
         } finally {
             lock.writeLock().unlock();
         }
+        // Declare result.
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        // Close limiters.
+        try {
+            closeLimiters();
+        } catch (Throwable t){
+            log.error("[{}] Error closing topic", topic, t);
+            unfenceTopicToResumeWithLock();
+            this.fullyCloseFuture.completeExceptionally(t);
+            return CompletableFuture.failedFuture(t);
+        }
+
+        // Close client components.
+        CompletableFuture<Void> closeClientsFuture = asyncCloseClients();
+        // Close managed ledger.
+        CompletableFuture<Void> closePhase2Future;
+        if (closeWithoutWaitingClientDisconnect){
+            closePhase2Future = asyncCloseLedger();
+        } else {
+            closePhase2Future = closeClientsFuture.thenCompose(__ -> asyncCloseLedger());
+        }
+        // Complete resultFuture. If managed ledger close failure, reset topic to resume.
+        closePhase2Future.thenApply(__ -> {
+            resultFuture.complete(null);
+            return null;
+        }).exceptionally(exception -> {
+            log.error("[{}] Error closing topic", topic, exception);
+            // Restart rate-limiter after close managed ledger failure. Success is not guaranteed.
+            // TODO Guarantee rate-limiter open finish.

Review Comment:
   please to not leave "TODOs", open a new GH ticket and link it



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1243,86 +1247,146 @@ public CompletableFuture<Void> close() {
     }
 
     /**
-     * Close this topic - close all producers and subscriptions associated with this topic.
-     *
+     * Close this topic - close all resources associated with this topic:
+     *   1.clients (producers and consumers) {@link #asyncCloseClients()}.
+     *   2.managed ledger {@link #asyncCloseLedger()}.
+     *   3.limiters {@link #closeLimiters()}.
      * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
      * @return Completable future indicating completion of close operation
      */
     @Override
     public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-
         lock.writeLock().lock();
         try {
             // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
             // forcefully wants to close managed-ledger without waiting all resources to be closed.
-            if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
-                fenceTopicToCloseOrDelete();
+            if (isClosingOrDeleting){
+                if (closeWithoutWaitingClientDisconnect){
+                    return this.fullyCloseFuture;
+                } else {
+                    // Why not return this.fullyCloseFuture ?
+                    // I don't know, just keep the same implementation as before.
+                    log.warn("[{}] Topic is already being closed or deleted", topic);
+                    return CompletableFuture.failedFuture(new TopicFencedException("Topic is already fenced"));
+                }
             } else {
-                log.warn("[{}] Topic is already being closed or deleted", topic);
-                closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
-                return closeFuture;
+                fenceTopicToCloseOrDelete();
+                this.fullyCloseFuture = new CompletableFuture<>();
             }
         } finally {
             lock.writeLock().unlock();
         }
+        // Declare result.
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        // Close limiters.
+        try {
+            closeLimiters();
+        } catch (Throwable t){
+            log.error("[{}] Error closing topic", topic, t);
+            unfenceTopicToResumeWithLock();
+            this.fullyCloseFuture.completeExceptionally(t);
+            return CompletableFuture.failedFuture(t);
+        }
+
+        // Close client components.
+        CompletableFuture<Void> closeClientsFuture = asyncCloseClients();
+        // Close managed ledger.
+        CompletableFuture<Void> closePhase2Future;
+        if (closeWithoutWaitingClientDisconnect){
+            closePhase2Future = asyncCloseLedger();
+        } else {
+            closePhase2Future = closeClientsFuture.thenCompose(__ -> asyncCloseLedger());
+        }
+        // Complete resultFuture. If managed ledger close failure, reset topic to resume.
+        closePhase2Future.thenApply(__ -> {
+            resultFuture.complete(null);
+            return null;
+        }).exceptionally(exception -> {
+            log.error("[{}] Error closing topic", topic, exception);
+            // Restart rate-limiter after close managed ledger failure. Success is not guaranteed.
+            // TODO Guarantee rate-limiter open finish.
+            try {
+                restartLimitersAfterCloseTopicFail();
+            } catch (Throwable t){

Review Comment:
   please do not catch Throwable blindly



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1953,15 +1953,46 @@ public AuthorizationService getAuthorizationService() {
         return authorizationService;
     }
 
-    public CompletableFuture<Void> removeTopicFromCache(String topic) {
+    public CompletableFuture<Void> removeTopicFromCache(String topicNameString, Topic topic) {
+        if (topic == null){
+            return removeTopicFromCache(topicNameString);
+        }
+        final CompletableFuture<Optional<Topic>> createTopicFuture = topics.get(topicNameString);
+        // If not exists in cache, do nothing.
+        if (createTopicFuture == null){
+            return CompletableFuture.completedFuture(null);
+        }
+        // We don't need to wait for the future complete, because we already have the topic reference here.
+        if (!createTopicFuture.isDone()){
+            return CompletableFuture.completedFuture(null);
+        }
+        // If @param topic is not equals with cached, do nothing.
+        Topic topicInCache = createTopicFuture.getNow(Optional.empty()).orElse(null);
+        if (topicInCache == null || topicInCache != topic){
+            return CompletableFuture.completedFuture(null);
+        }
+        // Do remove topic reference.
+        return removeTopicFromCache(topicNameString, createTopicFuture);
+    }
+
+    public CompletableFuture<Void> removeTopicFromCache(String topic){
+        return removeTopicFromCache(topic, (CompletableFuture) null);
+    }
+
+    public CompletableFuture<Void> removeTopicFromCache(String topic, CompletableFuture createTopicFuture) {

Review Comment:
   please add generic type to CompletableFuture



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1243,86 +1247,146 @@ public CompletableFuture<Void> close() {
     }
 
     /**
-     * Close this topic - close all producers and subscriptions associated with this topic.
-     *
+     * Close this topic - close all resources associated with this topic:
+     *   1.clients (producers and consumers) {@link #asyncCloseClients()}.
+     *   2.managed ledger {@link #asyncCloseLedger()}.
+     *   3.limiters {@link #closeLimiters()}.
      * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
      * @return Completable future indicating completion of close operation
      */
     @Override
     public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-
         lock.writeLock().lock();
         try {
             // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
             // forcefully wants to close managed-ledger without waiting all resources to be closed.
-            if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
-                fenceTopicToCloseOrDelete();
+            if (isClosingOrDeleting){
+                if (closeWithoutWaitingClientDisconnect){
+                    return this.fullyCloseFuture;
+                } else {
+                    // Why not return this.fullyCloseFuture ?
+                    // I don't know, just keep the same implementation as before.
+                    log.warn("[{}] Topic is already being closed or deleted", topic);
+                    return CompletableFuture.failedFuture(new TopicFencedException("Topic is already fenced"));
+                }
             } else {
-                log.warn("[{}] Topic is already being closed or deleted", topic);
-                closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
-                return closeFuture;
+                fenceTopicToCloseOrDelete();
+                this.fullyCloseFuture = new CompletableFuture<>();
             }
         } finally {
             lock.writeLock().unlock();
         }
+        // Declare result.
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        // Close limiters.
+        try {
+            closeLimiters();
+        } catch (Throwable t){
+            log.error("[{}] Error closing topic", topic, t);
+            unfenceTopicToResumeWithLock();
+            this.fullyCloseFuture.completeExceptionally(t);
+            return CompletableFuture.failedFuture(t);
+        }
+
+        // Close client components.
+        CompletableFuture<Void> closeClientsFuture = asyncCloseClients();
+        // Close managed ledger.
+        CompletableFuture<Void> closePhase2Future;
+        if (closeWithoutWaitingClientDisconnect){
+            closePhase2Future = asyncCloseLedger();
+        } else {
+            closePhase2Future = closeClientsFuture.thenCompose(__ -> asyncCloseLedger());
+        }
+        // Complete resultFuture. If managed ledger close failure, reset topic to resume.
+        closePhase2Future.thenApply(__ -> {

Review Comment:
   nit: we can use "whenComplete" instead of theApply/exceptionally



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1243,86 +1247,146 @@ public CompletableFuture<Void> close() {
     }
 
     /**
-     * Close this topic - close all producers and subscriptions associated with this topic.
-     *
+     * Close this topic - close all resources associated with this topic:
+     *   1.clients (producers and consumers) {@link #asyncCloseClients()}.
+     *   2.managed ledger {@link #asyncCloseLedger()}.
+     *   3.limiters {@link #closeLimiters()}.
      * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
      * @return Completable future indicating completion of close operation
      */
     @Override
     public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-
         lock.writeLock().lock();
         try {
             // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
             // forcefully wants to close managed-ledger without waiting all resources to be closed.
-            if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
-                fenceTopicToCloseOrDelete();
+            if (isClosingOrDeleting){
+                if (closeWithoutWaitingClientDisconnect){
+                    return this.fullyCloseFuture;
+                } else {
+                    // Why not return this.fullyCloseFuture ?
+                    // I don't know, just keep the same implementation as before.
+                    log.warn("[{}] Topic is already being closed or deleted", topic);
+                    return CompletableFuture.failedFuture(new TopicFencedException("Topic is already fenced"));
+                }
             } else {
-                log.warn("[{}] Topic is already being closed or deleted", topic);
-                closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
-                return closeFuture;
+                fenceTopicToCloseOrDelete();
+                this.fullyCloseFuture = new CompletableFuture<>();
             }
         } finally {
             lock.writeLock().unlock();
         }
+        // Declare result.
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        // Close limiters.
+        try {
+            closeLimiters();
+        } catch (Throwable t){

Review Comment:
   why are we catching "Throwable" ? this is usually a bad practice



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1953,15 +1953,46 @@ public AuthorizationService getAuthorizationService() {
         return authorizationService;
     }
 
-    public CompletableFuture<Void> removeTopicFromCache(String topic) {
+    public CompletableFuture<Void> removeTopicFromCache(String topicNameString, Topic topic) {
+        if (topic == null){
+            return removeTopicFromCache(topicNameString);
+        }
+        final CompletableFuture<Optional<Topic>> createTopicFuture = topics.get(topicNameString);
+        // If not exists in cache, do nothing.
+        if (createTopicFuture == null){
+            return CompletableFuture.completedFuture(null);
+        }
+        // We don't need to wait for the future complete, because we already have the topic reference here.
+        if (!createTopicFuture.isDone()){

Review Comment:
   we can save us from using getNow and we can chain the CompletableFuture with "thenCompose"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #16247: [fix] [broker] The broker has two identical Persitenttopics

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #16247:
URL: https://github.com/apache/pulsar/pull/16247#issuecomment-1219103700

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16247: [fix] [broker] The broker has two identical Persitenttopics

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16247:
URL: https://github.com/apache/pulsar/pull/16247#discussion_r958321455


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1243,86 +1247,145 @@ public CompletableFuture<Void> close() {
     }
 
     /**
-     * Close this topic - close all producers and subscriptions associated with this topic.
-     *
+     * Close this topic - close all resources associated with this topic:
+     *   1.clients (producers and consumers) {@link #asyncCloseClients()}.
+     *   2.managed ledger {@link #asyncCloseLedger()}.
+     *   3.limiters {@link #closeLimiters()}.
      * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
      * @return Completable future indicating completion of close operation
      */
     @Override
     public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-
         lock.writeLock().lock();
         try {
             // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
             // forcefully wants to close managed-ledger without waiting all resources to be closed.
-            if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
-                fenceTopicToCloseOrDelete();
+            if (isClosingOrDeleting){
+                if (closeWithoutWaitingClientDisconnect){
+                    return this.fullyCloseFuture;
+                } else {
+                    // Why not return this.fullyCloseFuture ?
+                    // Just keep the same implementation as before.
+                    log.warn("[{}] Topic is already being closed or deleted", topic);
+                    return CompletableFuture.failedFuture(new TopicFencedException("Topic is already fenced"));
+                }
             } else {
-                log.warn("[{}] Topic is already being closed or deleted", topic);
-                closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
-                return closeFuture;
+                fenceTopicToCloseOrDelete();
+                this.fullyCloseFuture = new CompletableFuture<>();
             }
         } finally {
             lock.writeLock().unlock();
         }
+        // Declare result.
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        // Close limiters.
+        try {
+            closeLimiters();
+        } catch (Exception t){
+            log.error("[{}] Error closing topic", topic, t);
+            unfenceTopicToResumeWithLock();
+            this.fullyCloseFuture.completeExceptionally(t);
+            return CompletableFuture.failedFuture(t);
+        }
+
+        // Close client components.
+        CompletableFuture<Void> closeClientsFuture = asyncCloseClients();

Review Comment:
   Just to keep the logic the same as before: Restart rate-limiter after close managed ledger failure. Success is not guaranteed.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1243,86 +1247,145 @@ public CompletableFuture<Void> close() {
     }
 
     /**
-     * Close this topic - close all producers and subscriptions associated with this topic.
-     *
+     * Close this topic - close all resources associated with this topic:
+     *   1.clients (producers and consumers) {@link #asyncCloseClients()}.
+     *   2.managed ledger {@link #asyncCloseLedger()}.
+     *   3.limiters {@link #closeLimiters()}.
      * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
      * @return Completable future indicating completion of close operation
      */
     @Override
     public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-
         lock.writeLock().lock();
         try {
             // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
             // forcefully wants to close managed-ledger without waiting all resources to be closed.
-            if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
-                fenceTopicToCloseOrDelete();
+            if (isClosingOrDeleting){
+                if (closeWithoutWaitingClientDisconnect){
+                    return this.fullyCloseFuture;
+                } else {
+                    // Why not return this.fullyCloseFuture ?
+                    // Just keep the same implementation as before.
+                    log.warn("[{}] Topic is already being closed or deleted", topic);
+                    return CompletableFuture.failedFuture(new TopicFencedException("Topic is already fenced"));
+                }
             } else {
-                log.warn("[{}] Topic is already being closed or deleted", topic);
-                closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
-                return closeFuture;
+                fenceTopicToCloseOrDelete();
+                this.fullyCloseFuture = new CompletableFuture<>();
             }
         } finally {
             lock.writeLock().unlock();
         }
+        // Declare result.
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        // Close limiters.
+        try {
+            closeLimiters();
+        } catch (Exception t){

Review Comment:
   Just to keep the logic the same as before: `close limiters` failure does not affect `topic close`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16247: [fix] [broker] The broker has two identical Persitenttopics

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16247:
URL: https://github.com/apache/pulsar/pull/16247#discussion_r958321455


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1243,86 +1247,145 @@ public CompletableFuture<Void> close() {
     }
 
     /**
-     * Close this topic - close all producers and subscriptions associated with this topic.
-     *
+     * Close this topic - close all resources associated with this topic:
+     *   1.clients (producers and consumers) {@link #asyncCloseClients()}.
+     *   2.managed ledger {@link #asyncCloseLedger()}.
+     *   3.limiters {@link #closeLimiters()}.
      * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
      * @return Completable future indicating completion of close operation
      */
     @Override
     public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-
         lock.writeLock().lock();
         try {
             // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
             // forcefully wants to close managed-ledger without waiting all resources to be closed.
-            if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
-                fenceTopicToCloseOrDelete();
+            if (isClosingOrDeleting){
+                if (closeWithoutWaitingClientDisconnect){
+                    return this.fullyCloseFuture;
+                } else {
+                    // Why not return this.fullyCloseFuture ?
+                    // Just keep the same implementation as before.
+                    log.warn("[{}] Topic is already being closed or deleted", topic);
+                    return CompletableFuture.failedFuture(new TopicFencedException("Topic is already fenced"));
+                }
             } else {
-                log.warn("[{}] Topic is already being closed or deleted", topic);
-                closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
-                return closeFuture;
+                fenceTopicToCloseOrDelete();
+                this.fullyCloseFuture = new CompletableFuture<>();
             }
         } finally {
             lock.writeLock().unlock();
         }
+        // Declare result.
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        // Close limiters.
+        try {
+            closeLimiters();
+        } catch (Exception t){
+            log.error("[{}] Error closing topic", topic, t);
+            unfenceTopicToResumeWithLock();
+            this.fullyCloseFuture.completeExceptionally(t);
+            return CompletableFuture.failedFuture(t);
+        }
+
+        // Close client components.
+        CompletableFuture<Void> closeClientsFuture = asyncCloseClients();

Review Comment:
   -



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #16247: [fix] [broker] The broker has two identical Persitenttopics

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #16247:
URL: https://github.com/apache/pulsar/pull/16247#issuecomment-1219679035

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16247: [fix] [broker] The broker has two identical Persitenttopics

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16247:
URL: https://github.com/apache/pulsar/pull/16247#discussion_r927675700


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1918,15 +1918,38 @@ public AuthorizationService getAuthorizationService() {
         return authorizationService;
     }
 
-    public CompletableFuture<Void> removeTopicFromCache(String topic) {
+    public CompletableFuture<Void> removeTopicFromCache(String topicNameString, Topic topic) {
+        if (topic == null){
+            return removeTopicFromCache(topicNameString);
+        }
+        final CompletableFuture<Optional<Topic>> createTopicFuture = getTopic(topicNameString, false);
+        return createTopicFuture.thenCompose(optionalTopic -> {
+            if (optionalTopic.isPresent() && optionalTopic.get() == topic){
+                return removeTopicFromCache(topicNameString, createTopicFuture);
+            }
+            // If topic is not in Cache, do nothing.
+            return CompletableFuture.completedFuture(null);
+        });

Review Comment:
   > We also should consider to avoid a topic can be closed more than once.
   
   I have appended the reason why `topic.close` was executed twice to the Motivation, and in this PR I've overwritten the `topic.close` to fix it. I also added a lock to the 'reset topic stat to UN-fenced' operation, could you review the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #16247: [fix] [broker] The broker has two identical Persitenttopics

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #16247:
URL: https://github.com/apache/pulsar/pull/16247#issuecomment-1239603835

   To make this PR easy to Review, it was split into two other PR:
   
   - Make method `topic.close` non-reentrant. Also prevent reentrant between `topic.close` and `topic.delete`.
     - fixed by PR #17524
   - Use cmd  `map.remove(key, value)` instead of `map.remove(key)` in implementation of `BrokerService. 
     - fixed by PR #17526


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #16247: [fix] [broker] Repeat create same topic

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #16247:
URL: https://github.com/apache/pulsar/pull/16247#issuecomment-1167573753

   @gaoran10 @congbobo184 @Technoboy-  Please take a look. ^_^


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #16247: [fix] [broker] The broker has two identical Persitenttopics

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #16247:
URL: https://github.com/apache/pulsar/pull/16247#issuecomment-1220112506

   Hi @eolivelli 
   
   Could you review this PR again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16247: [fix] [broker] The broker has two identical Persitenttopics

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16247:
URL: https://github.com/apache/pulsar/pull/16247#discussion_r948576645


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1243,86 +1247,146 @@ public CompletableFuture<Void> close() {
     }
 
     /**
-     * Close this topic - close all producers and subscriptions associated with this topic.
-     *
+     * Close this topic - close all resources associated with this topic:
+     *   1.clients (producers and consumers) {@link #asyncCloseClients()}.
+     *   2.managed ledger {@link #asyncCloseLedger()}.
+     *   3.limiters {@link #closeLimiters()}.
      * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
      * @return Completable future indicating completion of close operation
      */
     @Override
     public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-
         lock.writeLock().lock();
         try {
             // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
             // forcefully wants to close managed-ledger without waiting all resources to be closed.
-            if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
-                fenceTopicToCloseOrDelete();
+            if (isClosingOrDeleting){
+                if (closeWithoutWaitingClientDisconnect){
+                    return this.fullyCloseFuture;
+                } else {
+                    // Why not return this.fullyCloseFuture ?
+                    // I don't know, just keep the same implementation as before.
+                    log.warn("[{}] Topic is already being closed or deleted", topic);
+                    return CompletableFuture.failedFuture(new TopicFencedException("Topic is already fenced"));
+                }
             } else {
-                log.warn("[{}] Topic is already being closed or deleted", topic);
-                closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
-                return closeFuture;
+                fenceTopicToCloseOrDelete();
+                this.fullyCloseFuture = new CompletableFuture<>();
             }
         } finally {
             lock.writeLock().unlock();
         }
+        // Declare result.
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        // Close limiters.
+        try {
+            closeLimiters();
+        } catch (Throwable t){
+            log.error("[{}] Error closing topic", topic, t);
+            unfenceTopicToResumeWithLock();
+            this.fullyCloseFuture.completeExceptionally(t);
+            return CompletableFuture.failedFuture(t);
+        }
+
+        // Close client components.
+        CompletableFuture<Void> closeClientsFuture = asyncCloseClients();
+        // Close managed ledger.
+        CompletableFuture<Void> closePhase2Future;
+        if (closeWithoutWaitingClientDisconnect){
+            closePhase2Future = asyncCloseLedger();
+        } else {
+            closePhase2Future = closeClientsFuture.thenCompose(__ -> asyncCloseLedger());
+        }
+        // Complete resultFuture. If managed ledger close failure, reset topic to resume.
+        closePhase2Future.thenApply(__ -> {
+            resultFuture.complete(null);
+            return null;
+        }).exceptionally(exception -> {
+            log.error("[{}] Error closing topic", topic, exception);
+            // Restart rate-limiter after close managed ledger failure. Success is not guaranteed.
+            // TODO Guarantee rate-limiter open finish.
+            try {
+                restartLimitersAfterCloseTopicFail();
+            } catch (Throwable t){

Review Comment:
   Already instead "Throwable" to "Exception". Thanks



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1243,86 +1247,146 @@ public CompletableFuture<Void> close() {
     }
 
     /**
-     * Close this topic - close all producers and subscriptions associated with this topic.
-     *
+     * Close this topic - close all resources associated with this topic:
+     *   1.clients (producers and consumers) {@link #asyncCloseClients()}.
+     *   2.managed ledger {@link #asyncCloseLedger()}.
+     *   3.limiters {@link #closeLimiters()}.
      * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
      * @return Completable future indicating completion of close operation
      */
     @Override
     public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-
         lock.writeLock().lock();
         try {
             // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
             // forcefully wants to close managed-ledger without waiting all resources to be closed.
-            if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
-                fenceTopicToCloseOrDelete();
+            if (isClosingOrDeleting){
+                if (closeWithoutWaitingClientDisconnect){
+                    return this.fullyCloseFuture;
+                } else {
+                    // Why not return this.fullyCloseFuture ?
+                    // I don't know, just keep the same implementation as before.
+                    log.warn("[{}] Topic is already being closed or deleted", topic);
+                    return CompletableFuture.failedFuture(new TopicFencedException("Topic is already fenced"));
+                }
             } else {
-                log.warn("[{}] Topic is already being closed or deleted", topic);
-                closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
-                return closeFuture;
+                fenceTopicToCloseOrDelete();
+                this.fullyCloseFuture = new CompletableFuture<>();
             }
         } finally {
             lock.writeLock().unlock();
         }
+        // Declare result.
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        // Close limiters.
+        try {
+            closeLimiters();
+        } catch (Throwable t){
+            log.error("[{}] Error closing topic", topic, t);
+            unfenceTopicToResumeWithLock();
+            this.fullyCloseFuture.completeExceptionally(t);
+            return CompletableFuture.failedFuture(t);
+        }
+
+        // Close client components.
+        CompletableFuture<Void> closeClientsFuture = asyncCloseClients();
+        // Close managed ledger.
+        CompletableFuture<Void> closePhase2Future;
+        if (closeWithoutWaitingClientDisconnect){
+            closePhase2Future = asyncCloseLedger();
+        } else {
+            closePhase2Future = closeClientsFuture.thenCompose(__ -> asyncCloseLedger());
+        }
+        // Complete resultFuture. If managed ledger close failure, reset topic to resume.
+        closePhase2Future.thenApply(__ -> {
+            resultFuture.complete(null);
+            return null;
+        }).exceptionally(exception -> {
+            log.error("[{}] Error closing topic", topic, exception);
+            // Restart rate-limiter after close managed ledger failure. Success is not guaranteed.
+            // TODO Guarantee rate-limiter open finish.

Review Comment:
   Already remove "TODO". Thanks



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1243,86 +1247,146 @@ public CompletableFuture<Void> close() {
     }
 
     /**
-     * Close this topic - close all producers and subscriptions associated with this topic.
-     *
+     * Close this topic - close all resources associated with this topic:
+     *   1.clients (producers and consumers) {@link #asyncCloseClients()}.
+     *   2.managed ledger {@link #asyncCloseLedger()}.
+     *   3.limiters {@link #closeLimiters()}.
      * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
      * @return Completable future indicating completion of close operation
      */
     @Override
     public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-
         lock.writeLock().lock();
         try {
             // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
             // forcefully wants to close managed-ledger without waiting all resources to be closed.
-            if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
-                fenceTopicToCloseOrDelete();
+            if (isClosingOrDeleting){
+                if (closeWithoutWaitingClientDisconnect){
+                    return this.fullyCloseFuture;
+                } else {
+                    // Why not return this.fullyCloseFuture ?
+                    // I don't know, just keep the same implementation as before.
+                    log.warn("[{}] Topic is already being closed or deleted", topic);
+                    return CompletableFuture.failedFuture(new TopicFencedException("Topic is already fenced"));
+                }
             } else {
-                log.warn("[{}] Topic is already being closed or deleted", topic);
-                closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
-                return closeFuture;
+                fenceTopicToCloseOrDelete();
+                this.fullyCloseFuture = new CompletableFuture<>();
             }
         } finally {
             lock.writeLock().unlock();
         }
+        // Declare result.
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        // Close limiters.
+        try {
+            closeLimiters();
+        } catch (Throwable t){
+            log.error("[{}] Error closing topic", topic, t);
+            unfenceTopicToResumeWithLock();
+            this.fullyCloseFuture.completeExceptionally(t);
+            return CompletableFuture.failedFuture(t);
+        }
+
+        // Close client components.
+        CompletableFuture<Void> closeClientsFuture = asyncCloseClients();
+        // Close managed ledger.
+        CompletableFuture<Void> closePhase2Future;
+        if (closeWithoutWaitingClientDisconnect){
+            closePhase2Future = asyncCloseLedger();
+        } else {
+            closePhase2Future = closeClientsFuture.thenCompose(__ -> asyncCloseLedger());
+        }
+        // Complete resultFuture. If managed ledger close failure, reset topic to resume.
+        closePhase2Future.thenApply(__ -> {

Review Comment:
   Already use "whenComplete" instead of "theApply". Thanks



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1243,86 +1247,146 @@ public CompletableFuture<Void> close() {
     }
 
     /**
-     * Close this topic - close all producers and subscriptions associated with this topic.
-     *
+     * Close this topic - close all resources associated with this topic:
+     *   1.clients (producers and consumers) {@link #asyncCloseClients()}.
+     *   2.managed ledger {@link #asyncCloseLedger()}.
+     *   3.limiters {@link #closeLimiters()}.
      * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
      * @return Completable future indicating completion of close operation
      */
     @Override
     public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-
         lock.writeLock().lock();
         try {
             // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
             // forcefully wants to close managed-ledger without waiting all resources to be closed.
-            if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
-                fenceTopicToCloseOrDelete();
+            if (isClosingOrDeleting){
+                if (closeWithoutWaitingClientDisconnect){
+                    return this.fullyCloseFuture;
+                } else {
+                    // Why not return this.fullyCloseFuture ?
+                    // I don't know, just keep the same implementation as before.
+                    log.warn("[{}] Topic is already being closed or deleted", topic);
+                    return CompletableFuture.failedFuture(new TopicFencedException("Topic is already fenced"));
+                }
             } else {
-                log.warn("[{}] Topic is already being closed or deleted", topic);
-                closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
-                return closeFuture;
+                fenceTopicToCloseOrDelete();
+                this.fullyCloseFuture = new CompletableFuture<>();
             }
         } finally {
             lock.writeLock().unlock();
         }
+        // Declare result.
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        // Close limiters.
+        try {
+            closeLimiters();
+        } catch (Throwable t){

Review Comment:
   Already instead "Throwable" to "Exception". Thanks



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1243,86 +1247,146 @@ public CompletableFuture<Void> close() {
     }
 
     /**
-     * Close this topic - close all producers and subscriptions associated with this topic.
-     *
+     * Close this topic - close all resources associated with this topic:
+     *   1.clients (producers and consumers) {@link #asyncCloseClients()}.
+     *   2.managed ledger {@link #asyncCloseLedger()}.
+     *   3.limiters {@link #closeLimiters()}.
      * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
      * @return Completable future indicating completion of close operation
      */
     @Override
     public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-
         lock.writeLock().lock();
         try {
             // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
             // forcefully wants to close managed-ledger without waiting all resources to be closed.
-            if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
-                fenceTopicToCloseOrDelete();
+            if (isClosingOrDeleting){
+                if (closeWithoutWaitingClientDisconnect){
+                    return this.fullyCloseFuture;
+                } else {
+                    // Why not return this.fullyCloseFuture ?
+                    // I don't know, just keep the same implementation as before.

Review Comment:
   Already remove.Thanks



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1953,15 +1953,46 @@ public AuthorizationService getAuthorizationService() {
         return authorizationService;
     }
 
-    public CompletableFuture<Void> removeTopicFromCache(String topic) {
+    public CompletableFuture<Void> removeTopicFromCache(String topicNameString, Topic topic) {
+        if (topic == null){
+            return removeTopicFromCache(topicNameString);
+        }
+        final CompletableFuture<Optional<Topic>> createTopicFuture = topics.get(topicNameString);
+        // If not exists in cache, do nothing.
+        if (createTopicFuture == null){
+            return CompletableFuture.completedFuture(null);
+        }
+        // We don't need to wait for the future complete, because we already have the topic reference here.
+        if (!createTopicFuture.isDone()){
+            return CompletableFuture.completedFuture(null);
+        }
+        // If @param topic is not equals with cached, do nothing.
+        Topic topicInCache = createTopicFuture.getNow(Optional.empty()).orElse(null);
+        if (topicInCache == null || topicInCache != topic){
+            return CompletableFuture.completedFuture(null);
+        }
+        // Do remove topic reference.
+        return removeTopicFromCache(topicNameString, createTopicFuture);
+    }
+
+    public CompletableFuture<Void> removeTopicFromCache(String topic){
+        return removeTopicFromCache(topic, (CompletableFuture) null);
+    }
+
+    public CompletableFuture<Void> removeTopicFromCache(String topic, CompletableFuture createTopicFuture) {

Review Comment:
   Already add generic type to CompletableFuture. Thanks



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1953,15 +1953,46 @@ public AuthorizationService getAuthorizationService() {
         return authorizationService;
     }
 
-    public CompletableFuture<Void> removeTopicFromCache(String topic) {
+    public CompletableFuture<Void> removeTopicFromCache(String topicNameString, Topic topic) {
+        if (topic == null){
+            return removeTopicFromCache(topicNameString);
+        }
+        final CompletableFuture<Optional<Topic>> createTopicFuture = topics.get(topicNameString);
+        // If not exists in cache, do nothing.
+        if (createTopicFuture == null){
+            return CompletableFuture.completedFuture(null);
+        }
+        // We don't need to wait for the future complete, because we already have the topic reference here.
+        if (!createTopicFuture.isDone()){

Review Comment:
   > I think that we should wait for this future to complete, otherwise it may create another Topic
   
   One comment suggestion here: "don't need to wait for the future complete" 
   https://github.com/apache/pulsar/pull/16247#issuecomment-1170101266
   
   > we can save us from using getNow and we can chain the CompletableFuture with "thenCompose"
   
   Already use "thenCompose" instead "getNow". Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #16247: [fix] [broker] The broker has two identical Persitenttopics

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #16247:
URL: https://github.com/apache/pulsar/pull/16247#issuecomment-1170103877

   @eolivelli @lhotari @michaeljmarshall @gaozhangmin @Jason918 @nicoloboschi   Please take a look, if you have time. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #16247: [fix] [broker] The broker has two identical Persitenttopics

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #16247:
URL: https://github.com/apache/pulsar/pull/16247#issuecomment-1170707786

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16247: [fix] [broker] The broker has two identical Persitenttopics

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16247:
URL: https://github.com/apache/pulsar/pull/16247#discussion_r927674969


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1918,15 +1918,38 @@ public AuthorizationService getAuthorizationService() {
         return authorizationService;
     }
 
-    public CompletableFuture<Void> removeTopicFromCache(String topic) {
+    public CompletableFuture<Void> removeTopicFromCache(String topicNameString, Topic topic) {
+        if (topic == null){
+            return removeTopicFromCache(topicNameString);
+        }
+        final CompletableFuture<Optional<Topic>> createTopicFuture = getTopic(topicNameString, false);
+        return createTopicFuture.thenCompose(optionalTopic -> {
+            if (optionalTopic.isPresent() && optionalTopic.get() == topic){
+                return removeTopicFromCache(topicNameString, createTopicFuture);
+            }
+            // If topic is not in Cache, do nothing.
+            return CompletableFuture.completedFuture(null);
+        });

Review Comment:
   > We can use map.compute() to simplify the logic?  And looks like we don't need to wait for the future complete
   
   Yes, I've rewritten the logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #16247: [fix] [broker] The broker has two identical Persitenttopics

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #16247:
URL: https://github.com/apache/pulsar/pull/16247#issuecomment-1239604187

   To make this PR easy to Review, it was split into two other PR:
   
   - Make method `topic.close` non-reentrant. Also prevent reentrant between `topic.close` and `topic.delete`.
     - fixed by PR #17524
   - Use cmd  `map.remove(key, value)` instead of `map.remove(key)` in implementation of `BrokerService. 
     - fixed by PR #17526


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #16247: [fix] [broker] The broker has two identical Persitenttopics

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #16247:
URL: https://github.com/apache/pulsar/pull/16247#discussion_r957967551


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1953,15 +1953,50 @@ public AuthorizationService getAuthorizationService() {
         return authorizationService;
     }
 
-    public CompletableFuture<Void> removeTopicFromCache(String topic) {
+    public CompletableFuture<Void> removeTopicFromCache(String topicNameString, Topic topic) {
+        if (topic == null){
+            return removeTopicFromCache(topicNameString);
+        }
+        final CompletableFuture<Optional<Topic>> createTopicFuture = topics.get(topicNameString);
+        // If not exists in cache, do nothing.
+        if (createTopicFuture == null){
+            return CompletableFuture.completedFuture(null);
+        }
+        // We don't need to wait for the future complete, because we already have the topic reference here.
+        if (!createTopicFuture.isDone()){
+            return CompletableFuture.completedFuture(null);
+        }
+        return createTopicFuture.thenCompose(topicOptional -> {
+            Topic topicInCache = topicOptional.orElse(null);
+            // If @param topic is not equals with cached, do nothing.
+            if (topicInCache == null || topicInCache != topic){
+                return CompletableFuture.completedFuture(null);
+            } else {
+                // Do remove.
+                return removeTopicFromCache(topicNameString, createTopicFuture);
+            }
+        });
+    }
+
+    public CompletableFuture<Void> removeTopicFromCache(String topic){
+        return removeTopicFromCache(topic, (CompletableFuture) null);

Review Comment:
   ```suggestion
           return removeTopicFromCache(topic, (CompletableFuture<Optional<Topic>>) null);
   ```
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1243,86 +1247,145 @@ public CompletableFuture<Void> close() {
     }
 
     /**
-     * Close this topic - close all producers and subscriptions associated with this topic.
-     *
+     * Close this topic - close all resources associated with this topic:
+     *   1.clients (producers and consumers) {@link #asyncCloseClients()}.
+     *   2.managed ledger {@link #asyncCloseLedger()}.
+     *   3.limiters {@link #closeLimiters()}.
      * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
      * @return Completable future indicating completion of close operation
      */
     @Override
     public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-
         lock.writeLock().lock();
         try {
             // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
             // forcefully wants to close managed-ledger without waiting all resources to be closed.
-            if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
-                fenceTopicToCloseOrDelete();
+            if (isClosingOrDeleting){
+                if (closeWithoutWaitingClientDisconnect){
+                    return this.fullyCloseFuture;
+                } else {
+                    // Why not return this.fullyCloseFuture ?
+                    // Just keep the same implementation as before.
+                    log.warn("[{}] Topic is already being closed or deleted", topic);
+                    return CompletableFuture.failedFuture(new TopicFencedException("Topic is already fenced"));
+                }
             } else {
-                log.warn("[{}] Topic is already being closed or deleted", topic);
-                closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
-                return closeFuture;
+                fenceTopicToCloseOrDelete();
+                this.fullyCloseFuture = new CompletableFuture<>();
             }
         } finally {
             lock.writeLock().unlock();
         }
+        // Declare result.
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        // Close limiters.
+        try {
+            closeLimiters();
+        } catch (Exception t){
+            log.error("[{}] Error closing topic", topic, t);
+            unfenceTopicToResumeWithLock();
+            this.fullyCloseFuture.completeExceptionally(t);
+            return CompletableFuture.failedFuture(t);
+        }
+
+        // Close client components.
+        CompletableFuture<Void> closeClientsFuture = asyncCloseClients();

Review Comment:
   ```suggestion
           CompletableFuture<Void> closeClientsFuture = asyncCloseClients(boolean closeWithoutWaitingClientDisconnect);
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1243,86 +1247,145 @@ public CompletableFuture<Void> close() {
     }
 
     /**
-     * Close this topic - close all producers and subscriptions associated with this topic.
-     *
+     * Close this topic - close all resources associated with this topic:
+     *   1.clients (producers and consumers) {@link #asyncCloseClients()}.
+     *   2.managed ledger {@link #asyncCloseLedger()}.
+     *   3.limiters {@link #closeLimiters()}.
      * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
      * @return Completable future indicating completion of close operation
      */
     @Override
     public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-
         lock.writeLock().lock();
         try {
             // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
             // forcefully wants to close managed-ledger without waiting all resources to be closed.
-            if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
-                fenceTopicToCloseOrDelete();
+            if (isClosingOrDeleting){
+                if (closeWithoutWaitingClientDisconnect){
+                    return this.fullyCloseFuture;
+                } else {
+                    // Why not return this.fullyCloseFuture ?
+                    // Just keep the same implementation as before.
+                    log.warn("[{}] Topic is already being closed or deleted", topic);
+                    return CompletableFuture.failedFuture(new TopicFencedException("Topic is already fenced"));
+                }
             } else {
-                log.warn("[{}] Topic is already being closed or deleted", topic);
-                closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
-                return closeFuture;
+                fenceTopicToCloseOrDelete();
+                this.fullyCloseFuture = new CompletableFuture<>();
             }
         } finally {
             lock.writeLock().unlock();
         }
+        // Declare result.
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        // Close limiters.
+        try {
+            closeLimiters();
+        } catch (Exception t){

Review Comment:
   why should catch the Exception?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1243,86 +1247,145 @@ public CompletableFuture<Void> close() {
     }
 
     /**
-     * Close this topic - close all producers and subscriptions associated with this topic.
-     *
+     * Close this topic - close all resources associated with this topic:
+     *   1.clients (producers and consumers) {@link #asyncCloseClients()}.
+     *   2.managed ledger {@link #asyncCloseLedger()}.
+     *   3.limiters {@link #closeLimiters()}.
      * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
      * @return Completable future indicating completion of close operation
      */
     @Override
     public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-
         lock.writeLock().lock();
         try {
             // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
             // forcefully wants to close managed-ledger without waiting all resources to be closed.
-            if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
-                fenceTopicToCloseOrDelete();
+            if (isClosingOrDeleting){
+                if (closeWithoutWaitingClientDisconnect){
+                    return this.fullyCloseFuture;
+                } else {
+                    // Why not return this.fullyCloseFuture ?
+                    // Just keep the same implementation as before.
+                    log.warn("[{}] Topic is already being closed or deleted", topic);
+                    return CompletableFuture.failedFuture(new TopicFencedException("Topic is already fenced"));
+                }
             } else {
-                log.warn("[{}] Topic is already being closed or deleted", topic);
-                closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
-                return closeFuture;
+                fenceTopicToCloseOrDelete();
+                this.fullyCloseFuture = new CompletableFuture<>();
             }
         } finally {
             lock.writeLock().unlock();
         }
+        // Declare result.
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        // Close limiters.
+        try {
+            closeLimiters();
+        } catch (Exception t){
+            log.error("[{}] Error closing topic", topic, t);
+            unfenceTopicToResumeWithLock();
+            this.fullyCloseFuture.completeExceptionally(t);
+            return CompletableFuture.failedFuture(t);
+        }
+
+        // Close client components.
+        CompletableFuture<Void> closeClientsFuture = asyncCloseClients();
+        // Close managed ledger.
+        CompletableFuture<Void> closePhase2Future;
+        if (closeWithoutWaitingClientDisconnect){
+            closePhase2Future = asyncCloseLedger();
+        } else {
+            closePhase2Future = closeClientsFuture.thenCompose(__ -> asyncCloseLedger());
+        }
+        // Complete resultFuture. If managed ledger close failure, reset topic to resume.
+        closePhase2Future.whenComplete((__, ex) -> {
+            if (ex == null){
+                resultFuture.complete(null);
+            } else {
+                log.error("[{}] Error closing topic", topic, ex);
+                // Restart rate-limiter after close managed ledger failure. Success is not guaranteed.
+                try {
+                    restartLimitersAfterCloseTopicFail();

Review Comment:
   why should catch the Exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16247: [fix] [broker] The broker has two identical Persitenttopics

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16247:
URL: https://github.com/apache/pulsar/pull/16247#discussion_r958321735


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1243,86 +1247,145 @@ public CompletableFuture<Void> close() {
     }
 
     /**
-     * Close this topic - close all producers and subscriptions associated with this topic.
-     *
+     * Close this topic - close all resources associated with this topic:
+     *   1.clients (producers and consumers) {@link #asyncCloseClients()}.
+     *   2.managed ledger {@link #asyncCloseLedger()}.
+     *   3.limiters {@link #closeLimiters()}.
      * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
      * @return Completable future indicating completion of close operation
      */
     @Override
     public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-
         lock.writeLock().lock();
         try {
             // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
             // forcefully wants to close managed-ledger without waiting all resources to be closed.
-            if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
-                fenceTopicToCloseOrDelete();
+            if (isClosingOrDeleting){
+                if (closeWithoutWaitingClientDisconnect){
+                    return this.fullyCloseFuture;
+                } else {
+                    // Why not return this.fullyCloseFuture ?
+                    // Just keep the same implementation as before.
+                    log.warn("[{}] Topic is already being closed or deleted", topic);
+                    return CompletableFuture.failedFuture(new TopicFencedException("Topic is already fenced"));
+                }
             } else {
-                log.warn("[{}] Topic is already being closed or deleted", topic);
-                closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
-                return closeFuture;
+                fenceTopicToCloseOrDelete();
+                this.fullyCloseFuture = new CompletableFuture<>();
             }
         } finally {
             lock.writeLock().unlock();
         }
+        // Declare result.
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        // Close limiters.
+        try {
+            closeLimiters();
+        } catch (Exception t){
+            log.error("[{}] Error closing topic", topic, t);
+            unfenceTopicToResumeWithLock();
+            this.fullyCloseFuture.completeExceptionally(t);
+            return CompletableFuture.failedFuture(t);
+        }
+
+        // Close client components.
+        CompletableFuture<Void> closeClientsFuture = asyncCloseClients();
+        // Close managed ledger.
+        CompletableFuture<Void> closePhase2Future;
+        if (closeWithoutWaitingClientDisconnect){
+            closePhase2Future = asyncCloseLedger();
+        } else {
+            closePhase2Future = closeClientsFuture.thenCompose(__ -> asyncCloseLedger());
+        }
+        // Complete resultFuture. If managed ledger close failure, reset topic to resume.
+        closePhase2Future.whenComplete((__, ex) -> {
+            if (ex == null){
+                resultFuture.complete(null);
+            } else {
+                log.error("[{}] Error closing topic", topic, ex);
+                // Restart rate-limiter after close managed ledger failure. Success is not guaranteed.
+                try {
+                    restartLimitersAfterCloseTopicFail();

Review Comment:
   Just to keep the logic the same as before: Restart rate-limiter after close managed ledger failure. Success is not guaranteed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #16247: [fix] [broker] Repeat creates same topic

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #16247:
URL: https://github.com/apache/pulsar/pull/16247#issuecomment-1167713431

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #16247: [fix] [broker] Repeat creates same topic

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #16247:
URL: https://github.com/apache/pulsar/pull/16247#issuecomment-1168152495

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org