You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/08/17 16:04:09 UTC

[GitHub] [pulsar] eolivelli commented on a diff in pull request #16247: [fix] [broker] The broker has two identical Persitenttopics

eolivelli commented on code in PR #16247:
URL: https://github.com/apache/pulsar/pull/16247#discussion_r948146674


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1953,15 +1953,46 @@ public AuthorizationService getAuthorizationService() {
         return authorizationService;
     }
 
-    public CompletableFuture<Void> removeTopicFromCache(String topic) {
+    public CompletableFuture<Void> removeTopicFromCache(String topicNameString, Topic topic) {
+        if (topic == null){
+            return removeTopicFromCache(topicNameString);
+        }
+        final CompletableFuture<Optional<Topic>> createTopicFuture = topics.get(topicNameString);
+        // If not exists in cache, do nothing.
+        if (createTopicFuture == null){
+            return CompletableFuture.completedFuture(null);
+        }
+        // We don't need to wait for the future complete, because we already have the topic reference here.
+        if (!createTopicFuture.isDone()){

Review Comment:
   I think that we should wait for this future to complete, otherwise it may create another Topic



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1243,86 +1247,146 @@ public CompletableFuture<Void> close() {
     }
 
     /**
-     * Close this topic - close all producers and subscriptions associated with this topic.
-     *
+     * Close this topic - close all resources associated with this topic:
+     *   1.clients (producers and consumers) {@link #asyncCloseClients()}.
+     *   2.managed ledger {@link #asyncCloseLedger()}.
+     *   3.limiters {@link #closeLimiters()}.
      * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
      * @return Completable future indicating completion of close operation
      */
     @Override
     public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-
         lock.writeLock().lock();
         try {
             // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
             // forcefully wants to close managed-ledger without waiting all resources to be closed.
-            if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
-                fenceTopicToCloseOrDelete();
+            if (isClosingOrDeleting){
+                if (closeWithoutWaitingClientDisconnect){
+                    return this.fullyCloseFuture;
+                } else {
+                    // Why not return this.fullyCloseFuture ?
+                    // I don't know, just keep the same implementation as before.

Review Comment:
   please remove this kind of comments "I don't know"



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1243,86 +1247,146 @@ public CompletableFuture<Void> close() {
     }
 
     /**
-     * Close this topic - close all producers and subscriptions associated with this topic.
-     *
+     * Close this topic - close all resources associated with this topic:
+     *   1.clients (producers and consumers) {@link #asyncCloseClients()}.
+     *   2.managed ledger {@link #asyncCloseLedger()}.
+     *   3.limiters {@link #closeLimiters()}.
      * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
      * @return Completable future indicating completion of close operation
      */
     @Override
     public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-
         lock.writeLock().lock();
         try {
             // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
             // forcefully wants to close managed-ledger without waiting all resources to be closed.
-            if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
-                fenceTopicToCloseOrDelete();
+            if (isClosingOrDeleting){
+                if (closeWithoutWaitingClientDisconnect){
+                    return this.fullyCloseFuture;
+                } else {
+                    // Why not return this.fullyCloseFuture ?
+                    // I don't know, just keep the same implementation as before.
+                    log.warn("[{}] Topic is already being closed or deleted", topic);
+                    return CompletableFuture.failedFuture(new TopicFencedException("Topic is already fenced"));
+                }
             } else {
-                log.warn("[{}] Topic is already being closed or deleted", topic);
-                closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
-                return closeFuture;
+                fenceTopicToCloseOrDelete();
+                this.fullyCloseFuture = new CompletableFuture<>();
             }
         } finally {
             lock.writeLock().unlock();
         }
+        // Declare result.
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        // Close limiters.
+        try {
+            closeLimiters();
+        } catch (Throwable t){
+            log.error("[{}] Error closing topic", topic, t);
+            unfenceTopicToResumeWithLock();
+            this.fullyCloseFuture.completeExceptionally(t);
+            return CompletableFuture.failedFuture(t);
+        }
+
+        // Close client components.
+        CompletableFuture<Void> closeClientsFuture = asyncCloseClients();
+        // Close managed ledger.
+        CompletableFuture<Void> closePhase2Future;
+        if (closeWithoutWaitingClientDisconnect){
+            closePhase2Future = asyncCloseLedger();
+        } else {
+            closePhase2Future = closeClientsFuture.thenCompose(__ -> asyncCloseLedger());
+        }
+        // Complete resultFuture. If managed ledger close failure, reset topic to resume.
+        closePhase2Future.thenApply(__ -> {
+            resultFuture.complete(null);
+            return null;
+        }).exceptionally(exception -> {
+            log.error("[{}] Error closing topic", topic, exception);
+            // Restart rate-limiter after close managed ledger failure. Success is not guaranteed.
+            // TODO Guarantee rate-limiter open finish.

Review Comment:
   please to not leave "TODOs", open a new GH ticket and link it



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1243,86 +1247,146 @@ public CompletableFuture<Void> close() {
     }
 
     /**
-     * Close this topic - close all producers and subscriptions associated with this topic.
-     *
+     * Close this topic - close all resources associated with this topic:
+     *   1.clients (producers and consumers) {@link #asyncCloseClients()}.
+     *   2.managed ledger {@link #asyncCloseLedger()}.
+     *   3.limiters {@link #closeLimiters()}.
      * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
      * @return Completable future indicating completion of close operation
      */
     @Override
     public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-
         lock.writeLock().lock();
         try {
             // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
             // forcefully wants to close managed-ledger without waiting all resources to be closed.
-            if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
-                fenceTopicToCloseOrDelete();
+            if (isClosingOrDeleting){
+                if (closeWithoutWaitingClientDisconnect){
+                    return this.fullyCloseFuture;
+                } else {
+                    // Why not return this.fullyCloseFuture ?
+                    // I don't know, just keep the same implementation as before.
+                    log.warn("[{}] Topic is already being closed or deleted", topic);
+                    return CompletableFuture.failedFuture(new TopicFencedException("Topic is already fenced"));
+                }
             } else {
-                log.warn("[{}] Topic is already being closed or deleted", topic);
-                closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
-                return closeFuture;
+                fenceTopicToCloseOrDelete();
+                this.fullyCloseFuture = new CompletableFuture<>();
             }
         } finally {
             lock.writeLock().unlock();
         }
+        // Declare result.
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        // Close limiters.
+        try {
+            closeLimiters();
+        } catch (Throwable t){
+            log.error("[{}] Error closing topic", topic, t);
+            unfenceTopicToResumeWithLock();
+            this.fullyCloseFuture.completeExceptionally(t);
+            return CompletableFuture.failedFuture(t);
+        }
+
+        // Close client components.
+        CompletableFuture<Void> closeClientsFuture = asyncCloseClients();
+        // Close managed ledger.
+        CompletableFuture<Void> closePhase2Future;
+        if (closeWithoutWaitingClientDisconnect){
+            closePhase2Future = asyncCloseLedger();
+        } else {
+            closePhase2Future = closeClientsFuture.thenCompose(__ -> asyncCloseLedger());
+        }
+        // Complete resultFuture. If managed ledger close failure, reset topic to resume.
+        closePhase2Future.thenApply(__ -> {
+            resultFuture.complete(null);
+            return null;
+        }).exceptionally(exception -> {
+            log.error("[{}] Error closing topic", topic, exception);
+            // Restart rate-limiter after close managed ledger failure. Success is not guaranteed.
+            // TODO Guarantee rate-limiter open finish.
+            try {
+                restartLimitersAfterCloseTopicFail();
+            } catch (Throwable t){

Review Comment:
   please do not catch Throwable blindly



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1953,15 +1953,46 @@ public AuthorizationService getAuthorizationService() {
         return authorizationService;
     }
 
-    public CompletableFuture<Void> removeTopicFromCache(String topic) {
+    public CompletableFuture<Void> removeTopicFromCache(String topicNameString, Topic topic) {
+        if (topic == null){
+            return removeTopicFromCache(topicNameString);
+        }
+        final CompletableFuture<Optional<Topic>> createTopicFuture = topics.get(topicNameString);
+        // If not exists in cache, do nothing.
+        if (createTopicFuture == null){
+            return CompletableFuture.completedFuture(null);
+        }
+        // We don't need to wait for the future complete, because we already have the topic reference here.
+        if (!createTopicFuture.isDone()){
+            return CompletableFuture.completedFuture(null);
+        }
+        // If @param topic is not equals with cached, do nothing.
+        Topic topicInCache = createTopicFuture.getNow(Optional.empty()).orElse(null);
+        if (topicInCache == null || topicInCache != topic){
+            return CompletableFuture.completedFuture(null);
+        }
+        // Do remove topic reference.
+        return removeTopicFromCache(topicNameString, createTopicFuture);
+    }
+
+    public CompletableFuture<Void> removeTopicFromCache(String topic){
+        return removeTopicFromCache(topic, (CompletableFuture) null);
+    }
+
+    public CompletableFuture<Void> removeTopicFromCache(String topic, CompletableFuture createTopicFuture) {

Review Comment:
   please add generic type to CompletableFuture



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1243,86 +1247,146 @@ public CompletableFuture<Void> close() {
     }
 
     /**
-     * Close this topic - close all producers and subscriptions associated with this topic.
-     *
+     * Close this topic - close all resources associated with this topic:
+     *   1.clients (producers and consumers) {@link #asyncCloseClients()}.
+     *   2.managed ledger {@link #asyncCloseLedger()}.
+     *   3.limiters {@link #closeLimiters()}.
      * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
      * @return Completable future indicating completion of close operation
      */
     @Override
     public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-
         lock.writeLock().lock();
         try {
             // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
             // forcefully wants to close managed-ledger without waiting all resources to be closed.
-            if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
-                fenceTopicToCloseOrDelete();
+            if (isClosingOrDeleting){
+                if (closeWithoutWaitingClientDisconnect){
+                    return this.fullyCloseFuture;
+                } else {
+                    // Why not return this.fullyCloseFuture ?
+                    // I don't know, just keep the same implementation as before.
+                    log.warn("[{}] Topic is already being closed or deleted", topic);
+                    return CompletableFuture.failedFuture(new TopicFencedException("Topic is already fenced"));
+                }
             } else {
-                log.warn("[{}] Topic is already being closed or deleted", topic);
-                closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
-                return closeFuture;
+                fenceTopicToCloseOrDelete();
+                this.fullyCloseFuture = new CompletableFuture<>();
             }
         } finally {
             lock.writeLock().unlock();
         }
+        // Declare result.
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        // Close limiters.
+        try {
+            closeLimiters();
+        } catch (Throwable t){
+            log.error("[{}] Error closing topic", topic, t);
+            unfenceTopicToResumeWithLock();
+            this.fullyCloseFuture.completeExceptionally(t);
+            return CompletableFuture.failedFuture(t);
+        }
+
+        // Close client components.
+        CompletableFuture<Void> closeClientsFuture = asyncCloseClients();
+        // Close managed ledger.
+        CompletableFuture<Void> closePhase2Future;
+        if (closeWithoutWaitingClientDisconnect){
+            closePhase2Future = asyncCloseLedger();
+        } else {
+            closePhase2Future = closeClientsFuture.thenCompose(__ -> asyncCloseLedger());
+        }
+        // Complete resultFuture. If managed ledger close failure, reset topic to resume.
+        closePhase2Future.thenApply(__ -> {

Review Comment:
   nit: we can use "whenComplete" instead of theApply/exceptionally



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1243,86 +1247,146 @@ public CompletableFuture<Void> close() {
     }
 
     /**
-     * Close this topic - close all producers and subscriptions associated with this topic.
-     *
+     * Close this topic - close all resources associated with this topic:
+     *   1.clients (producers and consumers) {@link #asyncCloseClients()}.
+     *   2.managed ledger {@link #asyncCloseLedger()}.
+     *   3.limiters {@link #closeLimiters()}.
      * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
      * @return Completable future indicating completion of close operation
      */
     @Override
     public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-
         lock.writeLock().lock();
         try {
             // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
             // forcefully wants to close managed-ledger without waiting all resources to be closed.
-            if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
-                fenceTopicToCloseOrDelete();
+            if (isClosingOrDeleting){
+                if (closeWithoutWaitingClientDisconnect){
+                    return this.fullyCloseFuture;
+                } else {
+                    // Why not return this.fullyCloseFuture ?
+                    // I don't know, just keep the same implementation as before.
+                    log.warn("[{}] Topic is already being closed or deleted", topic);
+                    return CompletableFuture.failedFuture(new TopicFencedException("Topic is already fenced"));
+                }
             } else {
-                log.warn("[{}] Topic is already being closed or deleted", topic);
-                closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
-                return closeFuture;
+                fenceTopicToCloseOrDelete();
+                this.fullyCloseFuture = new CompletableFuture<>();
             }
         } finally {
             lock.writeLock().unlock();
         }
+        // Declare result.
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        // Close limiters.
+        try {
+            closeLimiters();
+        } catch (Throwable t){

Review Comment:
   why are we catching "Throwable" ? this is usually a bad practice



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1953,15 +1953,46 @@ public AuthorizationService getAuthorizationService() {
         return authorizationService;
     }
 
-    public CompletableFuture<Void> removeTopicFromCache(String topic) {
+    public CompletableFuture<Void> removeTopicFromCache(String topicNameString, Topic topic) {
+        if (topic == null){
+            return removeTopicFromCache(topicNameString);
+        }
+        final CompletableFuture<Optional<Topic>> createTopicFuture = topics.get(topicNameString);
+        // If not exists in cache, do nothing.
+        if (createTopicFuture == null){
+            return CompletableFuture.completedFuture(null);
+        }
+        // We don't need to wait for the future complete, because we already have the topic reference here.
+        if (!createTopicFuture.isDone()){

Review Comment:
   we can save us from using getNow and we can chain the CompletableFuture with "thenCompose"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org