You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/11/18 13:39:30 UTC

[GitHub] [pulsar] Jason918 opened a new pull request #12874: [broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer

Jason918 opened a new pull request #12874:
URL: https://github.com/apache/pulsar/pull/12874


   ### Motivation
   
   Currently, when broker receive "Producer" command,  it will check the topic if "isBacklogQuotaExceeded".
   While in `PersistentTopic#isBacklogQuotaExceeded`, `isTimeBacklogExceeded` is used, in which will turns to a blocking operation if "isPreciseTimeBasedBacklogQuotaCheck" is set as true. 
   
   The blocking operations in pulsar io threads may impact broker performance, this PR optimized this blocking procedure to async mode.
   
   ### Modifications
   
   Add "CompletableFuture<Boolean> checkTimeBacklogExceeded()" in PersistentTopic for the async check procedure.
   Update corresponding method calls to async mode in `ServerCnx#handleProducer`.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   
   This change is already covered by existing tests, such as org.apache.pulsar.broker.admin.TopicPoliciesTest
   
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (no)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no)
   
   ### Documentation
   
   Check the box below and label this PR (if you have committer privilege).
   
   Need to update docs? 
     
   - [x] `no-need-doc` 
   
   Only code optimize. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on pull request #12874: [broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer

Posted by GitBox <gi...@apache.org>.
Jason918 commented on pull request #12874:
URL: https://github.com/apache/pulsar/pull/12874#issuecomment-973921549


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on pull request #12874: [broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer

Posted by GitBox <gi...@apache.org>.
Jason918 commented on pull request #12874:
URL: https://github.com/apache/pulsar/pull/12874#issuecomment-976410726


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli merged pull request #12874: [broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer

Posted by GitBox <gi...@apache.org>.
eolivelli merged pull request #12874:
URL: https://github.com/apache/pulsar/pull/12874


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #12874: [broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #12874:
URL: https://github.com/apache/pulsar/pull/12874#discussion_r758649825



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -2518,17 +2528,26 @@ public boolean isSizeBacklogExceeded() {
      * @return determine if backlog quota enforcement needs to be done for topic based on time limit
      */
     public boolean isTimeBacklogExceeded() {
+        try {
+            return checkTimeBacklogExceeded().get();
+        } catch (Throwable e) {

Review comment:
       The main problem with this approach is that it completely hides from the caller the fact that the method is internally blocking. We have many other examples of this in the code base




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #12874: [broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #12874:
URL: https://github.com/apache/pulsar/pull/12874#issuecomment-1018161130


   @eolivelli Please help review this PR again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on a change in pull request #12874: [broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #12874:
URL: https://github.com/apache/pulsar/pull/12874#discussion_r786545409



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1188,69 +1189,75 @@ protected void handleProducer(final CommandProducer cmdProducer) {
 
             log.info("[{}][{}] Creating producer. producerId={}", remoteAddress, topicName, producerId);
 
-            service.getOrCreateTopic(topicName.toString()).thenAccept((Topic topic) -> {
+            service.getOrCreateTopic(topicName.toString()).thenCompose((Topic topic) -> {
                 // Before creating producer, check if backlog quota exceeded
                 // on topic for size based limit and time based limit
-                for (BacklogQuota.BacklogQuotaType backlogQuotaType : BacklogQuota.BacklogQuotaType.values()) {
-                    if (topic.isBacklogQuotaExceeded(producerName, backlogQuotaType)) {
-                        IllegalStateException illegalStateException = new IllegalStateException(
-                                "Cannot create producer on topic with backlog quota exceeded");
-                        BacklogQuota.RetentionPolicy retentionPolicy = topic
-                                .getBacklogQuota(backlogQuotaType).getPolicy();
-                        if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
-                            commandSender.sendErrorResponse(requestId,
-                                    ServerError.ProducerBlockedQuotaExceededError,
-                                    illegalStateException.getMessage());
-                        } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
-                            commandSender.sendErrorResponse(requestId,
-                                    ServerError.ProducerBlockedQuotaExceededException,
-                                    illegalStateException.getMessage());
-                        }
-                        producerFuture.completeExceptionally(illegalStateException);
-                        producers.remove(producerId, producerFuture);
-                        return;
+                CompletableFuture<Void> backlogQuotaCheckFuture = CompletableFuture.allOf(
+                        topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.destination_storage),
+                        topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.message_age));
+                backlogQuotaCheckFuture.exceptionally(throwable -> {

Review comment:
       @codelipenghui Moved TopicBacklogQuotaExceededException handling outside. PTAL.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on pull request #12874: [broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer

Posted by GitBox <gi...@apache.org>.
Jason918 commented on pull request #12874:
URL: https://github.com/apache/pulsar/pull/12874#issuecomment-974057408


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on a change in pull request #12874: [broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #12874:
URL: https://github.com/apache/pulsar/pull/12874#discussion_r755682655



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1189,65 +1190,70 @@ protected void handleProducer(final CommandProducer cmdProducer) {
             service.getOrCreateTopic(topicName.toString()).thenAccept((Topic topic) -> {
                 // Before creating producer, check if backlog quota exceeded
                 // on topic for size based limit and time based limit
-                for (BacklogQuota.BacklogQuotaType backlogQuotaType : BacklogQuota.BacklogQuotaType.values()) {
-                    if (topic.isBacklogQuotaExceeded(producerName, backlogQuotaType)) {
-                        IllegalStateException illegalStateException = new IllegalStateException(
-                                "Cannot create producer on topic with backlog quota exceeded");
-                        BacklogQuota.RetentionPolicy retentionPolicy = topic
-                                .getBacklogQuota(backlogQuotaType).getPolicy();
-                        if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
-                            commandSender.sendErrorResponse(requestId,
-                                    ServerError.ProducerBlockedQuotaExceededError,
-                                    illegalStateException.getMessage());
-                        } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
-                            commandSender.sendErrorResponse(requestId,
-                                    ServerError.ProducerBlockedQuotaExceededException,
-                                    illegalStateException.getMessage());
-                        }
-                        producerFuture.completeExceptionally(illegalStateException);
-                        producers.remove(producerId, producerFuture);
-                        return;
+                CompletableFuture<Void> backlogQuotaCheckFuture = CompletableFuture.allOf(
+                        topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.destination_storage),
+                        topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.message_age));
+                backlogQuotaCheckFuture.exceptionally(throwable -> {
+                    //throwable should be CompletionException holding TopicBacklogQuotaExceededException
+                    BrokerServiceException.TopicBacklogQuotaExceededException exception =
+                            (BrokerServiceException.TopicBacklogQuotaExceededException) throwable.getCause();
+                    IllegalStateException illegalStateException = new IllegalStateException(exception);
+                    BacklogQuota.RetentionPolicy retentionPolicy = exception.getRetentionPolicy();
+                    if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
+                        commandSender.sendErrorResponse(requestId,
+                                ServerError.ProducerBlockedQuotaExceededError,
+                                illegalStateException.getMessage());
+                    } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
+                        commandSender.sendErrorResponse(requestId,
+                                ServerError.ProducerBlockedQuotaExceededException,
+                                illegalStateException.getMessage());
                     }
-                }
-                // Check whether the producer will publish encrypted messages or not
-                if ((topic.isEncryptionRequired() || encryptionRequireOnProducer) && !isEncrypted) {
-                    String msg = String.format("Encryption is required in %s", topicName);
-                    log.warn("[{}] {}", remoteAddress, msg);
-                    commandSender.sendErrorResponse(requestId, ServerError.MetadataError, msg);
+                    producerFuture.completeExceptionally(illegalStateException);
                     producers.remove(producerId, producerFuture);
-                    return;
-                }
+                    return null;
+                });
 
-                disableTcpNoDelayIfNeeded(topicName.toString(), producerName);
+                backlogQuotaCheckFuture.thenRun(() -> {

Review comment:
       The trigger thread is the same as the above `backlogQuotaCheckFuture.exceptionally`, it's current thread or ReadEntry call back thread.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #12874: [broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #12874:
URL: https://github.com/apache/pulsar/pull/12874#discussion_r755527273



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1189,65 +1190,70 @@ protected void handleProducer(final CommandProducer cmdProducer) {
             service.getOrCreateTopic(topicName.toString()).thenAccept((Topic topic) -> {
                 // Before creating producer, check if backlog quota exceeded
                 // on topic for size based limit and time based limit
-                for (BacklogQuota.BacklogQuotaType backlogQuotaType : BacklogQuota.BacklogQuotaType.values()) {
-                    if (topic.isBacklogQuotaExceeded(producerName, backlogQuotaType)) {
-                        IllegalStateException illegalStateException = new IllegalStateException(
-                                "Cannot create producer on topic with backlog quota exceeded");
-                        BacklogQuota.RetentionPolicy retentionPolicy = topic
-                                .getBacklogQuota(backlogQuotaType).getPolicy();
-                        if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
-                            commandSender.sendErrorResponse(requestId,
-                                    ServerError.ProducerBlockedQuotaExceededError,
-                                    illegalStateException.getMessage());
-                        } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
-                            commandSender.sendErrorResponse(requestId,
-                                    ServerError.ProducerBlockedQuotaExceededException,
-                                    illegalStateException.getMessage());
-                        }
-                        producerFuture.completeExceptionally(illegalStateException);
-                        producers.remove(producerId, producerFuture);
-                        return;
+                CompletableFuture<Void> backlogQuotaCheckFuture = CompletableFuture.allOf(
+                        topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.destination_storage),
+                        topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.message_age));
+                backlogQuotaCheckFuture.exceptionally(throwable -> {
+                    //throwable should be CompletionException holding TopicBacklogQuotaExceededException
+                    BrokerServiceException.TopicBacklogQuotaExceededException exception =
+                            (BrokerServiceException.TopicBacklogQuotaExceededException) throwable.getCause();
+                    IllegalStateException illegalStateException = new IllegalStateException(exception);
+                    BacklogQuota.RetentionPolicy retentionPolicy = exception.getRetentionPolicy();
+                    if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
+                        commandSender.sendErrorResponse(requestId,
+                                ServerError.ProducerBlockedQuotaExceededError,
+                                illegalStateException.getMessage());
+                    } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
+                        commandSender.sendErrorResponse(requestId,
+                                ServerError.ProducerBlockedQuotaExceededException,
+                                illegalStateException.getMessage());
                     }
-                }
-                // Check whether the producer will publish encrypted messages or not
-                if ((topic.isEncryptionRequired() || encryptionRequireOnProducer) && !isEncrypted) {
-                    String msg = String.format("Encryption is required in %s", topicName);
-                    log.warn("[{}] {}", remoteAddress, msg);
-                    commandSender.sendErrorResponse(requestId, ServerError.MetadataError, msg);
+                    producerFuture.completeExceptionally(illegalStateException);
                     producers.remove(producerId, producerFuture);
-                    return;
-                }
+                    return null;
+                });
 
-                disableTcpNoDelayIfNeeded(topicName.toString(), producerName);
+                backlogQuotaCheckFuture.thenRun(() -> {

Review comment:
       Which thread will execute this code?
   
   The current thread (in case we reach here and the futures are already completed) or some other thread on completion of the one of the two futures above.
   
   I am not sure we have control over what's happening here

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -2518,17 +2528,26 @@ public boolean isSizeBacklogExceeded() {
      * @return determine if backlog quota enforcement needs to be done for topic based on time limit
      */
     public boolean isTimeBacklogExceeded() {
+        try {
+            return checkTimeBacklogExceeded().get();
+        } catch (Throwable e) {

Review comment:
       This is a code smell

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1189,65 +1190,70 @@ protected void handleProducer(final CommandProducer cmdProducer) {
             service.getOrCreateTopic(topicName.toString()).thenAccept((Topic topic) -> {
                 // Before creating producer, check if backlog quota exceeded
                 // on topic for size based limit and time based limit
-                for (BacklogQuota.BacklogQuotaType backlogQuotaType : BacklogQuota.BacklogQuotaType.values()) {
-                    if (topic.isBacklogQuotaExceeded(producerName, backlogQuotaType)) {
-                        IllegalStateException illegalStateException = new IllegalStateException(
-                                "Cannot create producer on topic with backlog quota exceeded");
-                        BacklogQuota.RetentionPolicy retentionPolicy = topic
-                                .getBacklogQuota(backlogQuotaType).getPolicy();
-                        if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
-                            commandSender.sendErrorResponse(requestId,
-                                    ServerError.ProducerBlockedQuotaExceededError,
-                                    illegalStateException.getMessage());
-                        } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
-                            commandSender.sendErrorResponse(requestId,
-                                    ServerError.ProducerBlockedQuotaExceededException,
-                                    illegalStateException.getMessage());
-                        }
-                        producerFuture.completeExceptionally(illegalStateException);
-                        producers.remove(producerId, producerFuture);
-                        return;
+                CompletableFuture<Void> backlogQuotaCheckFuture = CompletableFuture.allOf(
+                        topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.destination_storage),
+                        topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.message_age));
+                backlogQuotaCheckFuture.exceptionally(throwable -> {
+                    //throwable should be CompletionException holding TopicBacklogQuotaExceededException
+                    BrokerServiceException.TopicBacklogQuotaExceededException exception =

Review comment:
       What happens if there is something else?
   A ClassCastException or a NPE.
   This will break the system 
   
   I believe we should handle the case in which this expectation is not met

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1189,65 +1190,70 @@ protected void handleProducer(final CommandProducer cmdProducer) {
             service.getOrCreateTopic(topicName.toString()).thenAccept((Topic topic) -> {
                 // Before creating producer, check if backlog quota exceeded
                 // on topic for size based limit and time based limit
-                for (BacklogQuota.BacklogQuotaType backlogQuotaType : BacklogQuota.BacklogQuotaType.values()) {
-                    if (topic.isBacklogQuotaExceeded(producerName, backlogQuotaType)) {
-                        IllegalStateException illegalStateException = new IllegalStateException(
-                                "Cannot create producer on topic with backlog quota exceeded");
-                        BacklogQuota.RetentionPolicy retentionPolicy = topic
-                                .getBacklogQuota(backlogQuotaType).getPolicy();
-                        if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
-                            commandSender.sendErrorResponse(requestId,
-                                    ServerError.ProducerBlockedQuotaExceededError,
-                                    illegalStateException.getMessage());
-                        } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
-                            commandSender.sendErrorResponse(requestId,
-                                    ServerError.ProducerBlockedQuotaExceededException,
-                                    illegalStateException.getMessage());
-                        }
-                        producerFuture.completeExceptionally(illegalStateException);
-                        producers.remove(producerId, producerFuture);
-                        return;
+                CompletableFuture<Void> backlogQuotaCheckFuture = CompletableFuture.allOf(
+                        topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.destination_storage),
+                        topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.message_age));
+                backlogQuotaCheckFuture.exceptionally(throwable -> {

Review comment:
       Which thread will execute this code?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on pull request #12874: [broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer

Posted by GitBox <gi...@apache.org>.
Jason918 commented on pull request #12874:
URL: https://github.com/apache/pulsar/pull/12874#issuecomment-1015347389


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 closed pull request #12874: [broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer

Posted by GitBox <gi...@apache.org>.
Jason918 closed pull request #12874:
URL: https://github.com/apache/pulsar/pull/12874


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli merged pull request #12874: [broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer

Posted by GitBox <gi...@apache.org>.
eolivelli merged pull request #12874:
URL: https://github.com/apache/pulsar/pull/12874


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #12874: [broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #12874:
URL: https://github.com/apache/pulsar/pull/12874#issuecomment-1015112560


   @eolivelli Please help review this PR again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on pull request #12874: [broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer

Posted by GitBox <gi...@apache.org>.
Jason918 commented on pull request #12874:
URL: https://github.com/apache/pulsar/pull/12874#issuecomment-986475232


   @eolivelli PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on pull request #12874: [broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer

Posted by GitBox <gi...@apache.org>.
Jason918 commented on pull request #12874:
URL: https://github.com/apache/pulsar/pull/12874#issuecomment-978786483


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #12874: [broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #12874:
URL: https://github.com/apache/pulsar/pull/12874#issuecomment-975547013


   @Jason918 Do you want the resolve the conflicts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on a change in pull request #12874: [broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #12874:
URL: https://github.com/apache/pulsar/pull/12874#discussion_r755682022



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1189,65 +1190,70 @@ protected void handleProducer(final CommandProducer cmdProducer) {
             service.getOrCreateTopic(topicName.toString()).thenAccept((Topic topic) -> {
                 // Before creating producer, check if backlog quota exceeded
                 // on topic for size based limit and time based limit
-                for (BacklogQuota.BacklogQuotaType backlogQuotaType : BacklogQuota.BacklogQuotaType.values()) {
-                    if (topic.isBacklogQuotaExceeded(producerName, backlogQuotaType)) {
-                        IllegalStateException illegalStateException = new IllegalStateException(
-                                "Cannot create producer on topic with backlog quota exceeded");
-                        BacklogQuota.RetentionPolicy retentionPolicy = topic
-                                .getBacklogQuota(backlogQuotaType).getPolicy();
-                        if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
-                            commandSender.sendErrorResponse(requestId,
-                                    ServerError.ProducerBlockedQuotaExceededError,
-                                    illegalStateException.getMessage());
-                        } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
-                            commandSender.sendErrorResponse(requestId,
-                                    ServerError.ProducerBlockedQuotaExceededException,
-                                    illegalStateException.getMessage());
-                        }
-                        producerFuture.completeExceptionally(illegalStateException);
-                        producers.remove(producerId, producerFuture);
-                        return;
+                CompletableFuture<Void> backlogQuotaCheckFuture = CompletableFuture.allOf(
+                        topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.destination_storage),
+                        topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.message_age));
+                backlogQuotaCheckFuture.exceptionally(throwable -> {

Review comment:
       I am changing the outer thenAccept to thenCompose, the outside `exceptionally` will handle all unexpected exceptions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on a change in pull request #12874: [broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #12874:
URL: https://github.com/apache/pulsar/pull/12874#discussion_r755683362



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -2518,17 +2528,26 @@ public boolean isSizeBacklogExceeded() {
      * @return determine if backlog quota enforcement needs to be done for topic based on time limit
      */
     public boolean isTimeBacklogExceeded() {
+        try {
+            return checkTimeBacklogExceeded().get();
+        } catch (Throwable e) {

Review comment:
       This is just for keeping the same behavior of `isTimeBacklogExceeded` as before. This method is called somewhere else.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on pull request #12874: [broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer

Posted by GitBox <gi...@apache.org>.
Jason918 commented on pull request #12874:
URL: https://github.com/apache/pulsar/pull/12874#issuecomment-974138652


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on pull request #12874: [broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer

Posted by GitBox <gi...@apache.org>.
Jason918 commented on pull request #12874:
URL: https://github.com/apache/pulsar/pull/12874#issuecomment-974576062


   @codelipenghui @hangc0276 @congbobo184  Please help take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #12874: [broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #12874:
URL: https://github.com/apache/pulsar/pull/12874#discussion_r786449282



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1188,69 +1189,75 @@ protected void handleProducer(final CommandProducer cmdProducer) {
 
             log.info("[{}][{}] Creating producer. producerId={}", remoteAddress, topicName, producerId);
 
-            service.getOrCreateTopic(topicName.toString()).thenAccept((Topic topic) -> {
+            service.getOrCreateTopic(topicName.toString()).thenCompose((Topic topic) -> {
                 // Before creating producer, check if backlog quota exceeded
                 // on topic for size based limit and time based limit
-                for (BacklogQuota.BacklogQuotaType backlogQuotaType : BacklogQuota.BacklogQuotaType.values()) {
-                    if (topic.isBacklogQuotaExceeded(producerName, backlogQuotaType)) {
-                        IllegalStateException illegalStateException = new IllegalStateException(
-                                "Cannot create producer on topic with backlog quota exceeded");
-                        BacklogQuota.RetentionPolicy retentionPolicy = topic
-                                .getBacklogQuota(backlogQuotaType).getPolicy();
-                        if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
-                            commandSender.sendErrorResponse(requestId,
-                                    ServerError.ProducerBlockedQuotaExceededError,
-                                    illegalStateException.getMessage());
-                        } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
-                            commandSender.sendErrorResponse(requestId,
-                                    ServerError.ProducerBlockedQuotaExceededException,
-                                    illegalStateException.getMessage());
-                        }
-                        producerFuture.completeExceptionally(illegalStateException);
-                        producers.remove(producerId, producerFuture);
-                        return;
+                CompletableFuture<Void> backlogQuotaCheckFuture = CompletableFuture.allOf(
+                        topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.destination_storage),
+                        topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.message_age));
+                backlogQuotaCheckFuture.exceptionally(throwable -> {

Review comment:
       `thenCompose` return the `backlogQuotaCheckFuture` to the next stage, is it better to handle the `TopicBacklogQuotaExceededException` here https://github.com/apache/pulsar/pull/12874/files#diff-1e0e8195fb5ec5a6d79acbc7d859c025a9b711f94e6ab37c94439e99b3202e84R1262 ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on pull request #12874: [broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer

Posted by GitBox <gi...@apache.org>.
Jason918 commented on pull request #12874:
URL: https://github.com/apache/pulsar/pull/12874#issuecomment-973870356


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on pull request #12874: [broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer

Posted by GitBox <gi...@apache.org>.
Jason918 commented on pull request #12874:
URL: https://github.com/apache/pulsar/pull/12874#issuecomment-973682140


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on pull request #12874: [broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer

Posted by GitBox <gi...@apache.org>.
Jason918 commented on pull request #12874:
URL: https://github.com/apache/pulsar/pull/12874#issuecomment-975617045


   > @Jason918 Do you want the resolve the conflicts?
   
   Resolved, sorry for missing this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on a change in pull request #12874: [broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #12874:
URL: https://github.com/apache/pulsar/pull/12874#discussion_r755675993



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1189,65 +1190,70 @@ protected void handleProducer(final CommandProducer cmdProducer) {
             service.getOrCreateTopic(topicName.toString()).thenAccept((Topic topic) -> {
                 // Before creating producer, check if backlog quota exceeded
                 // on topic for size based limit and time based limit
-                for (BacklogQuota.BacklogQuotaType backlogQuotaType : BacklogQuota.BacklogQuotaType.values()) {
-                    if (topic.isBacklogQuotaExceeded(producerName, backlogQuotaType)) {
-                        IllegalStateException illegalStateException = new IllegalStateException(
-                                "Cannot create producer on topic with backlog quota exceeded");
-                        BacklogQuota.RetentionPolicy retentionPolicy = topic
-                                .getBacklogQuota(backlogQuotaType).getPolicy();
-                        if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
-                            commandSender.sendErrorResponse(requestId,
-                                    ServerError.ProducerBlockedQuotaExceededError,
-                                    illegalStateException.getMessage());
-                        } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
-                            commandSender.sendErrorResponse(requestId,
-                                    ServerError.ProducerBlockedQuotaExceededException,
-                                    illegalStateException.getMessage());
-                        }
-                        producerFuture.completeExceptionally(illegalStateException);
-                        producers.remove(producerId, producerFuture);
-                        return;
+                CompletableFuture<Void> backlogQuotaCheckFuture = CompletableFuture.allOf(
+                        topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.destination_storage),
+                        topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.message_age));
+                backlogQuotaCheckFuture.exceptionally(throwable -> {
+                    //throwable should be CompletionException holding TopicBacklogQuotaExceededException
+                    BrokerServiceException.TopicBacklogQuotaExceededException exception =

Review comment:
       Two situations:
   1. If `preciseTimeBasedBacklogQuotaCheck` set as true (default is false) and some other constrains met,  this will be execute in the callback thread of `ManagedLedgerImpl#asyncReadEntry(...)`. 
   2. Otherwise everything is executed in previous thread. Nothing async.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on pull request #12874: [broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer

Posted by GitBox <gi...@apache.org>.
Jason918 commented on pull request #12874:
URL: https://github.com/apache/pulsar/pull/12874#issuecomment-977561264


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on a change in pull request #12874: [broker] Optimize blocking backlogQuotaCheck to non-blocking in ServerCnx#handleProducer

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #12874:
URL: https://github.com/apache/pulsar/pull/12874#discussion_r758873895



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -2518,17 +2528,26 @@ public boolean isSizeBacklogExceeded() {
      * @return determine if backlog quota enforcement needs to be done for topic based on time limit
      */
     public boolean isTimeBacklogExceeded() {
+        try {
+            return checkTimeBacklogExceeded().get();
+        } catch (Throwable e) {

Review comment:
       I get it. Thx.
   I will remove this `isTimeBacklogExceeded`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org