You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/12/17 18:26:18 UTC

[GitHub] [pulsar] merlimat opened a new pull request #8992: PIP-68: WaitForExclusive producer access mode

merlimat opened a new pull request #8992:
URL: https://github.com/apache/pulsar/pull/8992


   ### Motivation
   
   Implemented the 2nd part of the proposal for PIP-68. 
   
   With `WaitForExclusive` mode, a producer is pending until there are other producers connected and then it will be created. 
   
   The change in the client logic is to make sure that we don't time out the producer creation request. The broker will send a 1st response saying that the producer creation is pending. At this point the client will disable the timeout on the original request.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet edited a comment on pull request #8992: PIP-68: WaitForExclusive producer access mode

Posted by GitBox <gi...@apache.org>.
Anonymitaet edited a comment on pull request #8992:
URL: https://github.com/apache/pulsar/pull/8992#issuecomment-799055572


   @merlimat we are adding docs, should we create a new section named "Access mode" under the [Producer](https://pulsar.apache.org/docs/en/next/concepts-messaging/#producers) chapter and add the `Shared`, `Exclusive`, and `WaitForExclusive` modes there? 
   
   I've drafted a [doc](https://docs.google.com/document/d/1bjTZnHMC4PXV-nVh0wdlJSmntzkBS6fPX89VfrPIdt8/edit), could you please help review? Thanks for your answer.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on pull request #8992: PIP-68: WaitForExclusive producer access mode

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on pull request #8992:
URL: https://github.com/apache/pulsar/pull/8992#issuecomment-796385711


   @merlimat thanks for your great work. Would you like to add docs accordingly? Thanks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #8992: PIP-68: WaitForExclusive producer access mode

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #8992:
URL: https://github.com/apache/pulsar/pull/8992#discussion_r546073971



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1192,6 +1193,17 @@ protected void handleProducer(final CommandProducer cmdProducer) {
                                     }
                                     return null;
                                 });
+
+                                producerQueuedFuture.thenRun(() -> {
+                                    // If the producer is queued waiting, we will get an immediate notification
+                                    // that we need to pass to client
+                                    if (isActive()) {
+                                        log.info("[{}] Producer is waiting in qeuue: {}", remoteAddress, producer);

Review comment:
       Fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #8992: PIP-68: WaitForExclusive producer access mode

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #8992:
URL: https://github.com/apache/pulsar/pull/8992#discussion_r546073784



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1192,6 +1193,17 @@ protected void handleProducer(final CommandProducer cmdProducer) {
                                     }
                                     return null;
                                 });
+
+                                producerQueuedFuture.thenRun(() -> {
+                                    // If the producer is queued waiting, we will get an immediate notification
+                                    // that we need to pass to client
+                                    if (isActive()) {
+                                        log.info("[{}] Producer is waiting in qeuue: {}", remoteAddress, producer);

Review comment:
       ```suggestion
                                           log.info("[{}] Producer is waiting in queue: {}", remoteAddress, producer);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #8992: PIP-68: WaitForExclusive producer access mode

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #8992:
URL: https://github.com/apache/pulsar/pull/8992#discussion_r546073335



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
##########
@@ -468,6 +469,19 @@ protected void handleProducerSuccess(CommandProducerSuccess success) {
                     success.getRequestId(), success.getProducerName());
         }
         long requestId = success.getRequestId();
+        if (!success.getProducerReady()) {
+            // We got a success operation but the producer is not ready. This means that the producer has been queued up
+            // in broker. We need to leave the future pending until we get the final confirmation. We just mark that
+            // we have received a response, in order to avoid the timeout.
+            TimedCompletableFuture<?> requestFuture = (TimedCompletableFuture<?>) pendingRequests.get(requestId);
+            if (requestFuture != null) {
+                log.info("{} Producer {} has been queued up at broker. request: {}", ctx.channel(),
+                        success.getProducerName(), requestId);
+                requestFuture.markAsResponded();

Review comment:
       Yes, the marking of the future is only for timeout within a single connection. When a connection fails, everything that was in that connections is marked as failed and will trigger a retry.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #8992: PIP-68: WaitForExclusive producer access mode

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #8992:
URL: https://github.com/apache/pulsar/pull/8992#discussion_r546070668



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1192,6 +1193,17 @@ protected void handleProducer(final CommandProducer cmdProducer) {
                                     }
                                     return null;
                                 });
+
+                                producerQueuedFuture.thenRun(() -> {
+                                    // If the producer is queued waiting, we will get an immediate notification
+                                    // that we need to pass to client
+                                    if (isActive()) {
+                                        log.info("[{}] Producer is waiting in qeuue: {}", remoteAddress, producer);

Review comment:
       Typo  qeuue

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
##########
@@ -468,6 +469,19 @@ protected void handleProducerSuccess(CommandProducerSuccess success) {
                     success.getRequestId(), success.getProducerName());
         }
         long requestId = success.getRequestId();
+        if (!success.getProducerReady()) {
+            // We got a success operation but the producer is not ready. This means that the producer has been queued up
+            // in broker. We need to leave the future pending until we get the final confirmation. We just mark that
+            // we have received a response, in order to avoid the timeout.
+            TimedCompletableFuture<?> requestFuture = (TimedCompletableFuture<?>) pendingRequests.get(requestId);
+            if (requestFuture != null) {
+                log.info("{} Producer {} has been queued up at broker. request: {}", ctx.channel(),
+                        success.getProducerName(), requestId);
+                requestFuture.markAsResponded();

Review comment:
       What happens if the PulsarClient gets closed or there is a network error?
   Are we guaranteed to fail and do not wait forever?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet edited a comment on pull request #8992: PIP-68: WaitForExclusive producer access mode

Posted by GitBox <gi...@apache.org>.
Anonymitaet edited a comment on pull request #8992:
URL: https://github.com/apache/pulsar/pull/8992#issuecomment-799055572


   @merlimat we are adding docs, should we create a new section named "Access mode" under the [Producer](https://pulsar.apache.org/docs/en/next/concepts-messaging/#producers) chapter and add the `Shared`, `Exclusive`, and `WaitForExclusive` modes there? 
   
   I've drafted [docs](https://docs.google.com/document/d/1bjTZnHMC4PXV-nVh0wdlJSmntzkBS6fPX89VfrPIdt8/edit), could you please help review? Thanks for your answer.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet edited a comment on pull request #8992: PIP-68: WaitForExclusive producer access mode

Posted by GitBox <gi...@apache.org>.
Anonymitaet edited a comment on pull request #8992:
URL: https://github.com/apache/pulsar/pull/8992#issuecomment-799055572






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on pull request #8992: PIP-68: WaitForExclusive producer access mode

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on pull request #8992:
URL: https://github.com/apache/pulsar/pull/8992#issuecomment-799055572


   @merlimat we are planning to add docs, should we add the `WaitForExclusive` mode to the [Subscriptions](https://pulsar.apache.org/docs/en/next/concepts-messaging/#subscriptions) section? (so that we have 5 subscription modes in Pulsar) thanks for your answer.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet edited a comment on pull request #8992: PIP-68: WaitForExclusive producer access mode

Posted by GitBox <gi...@apache.org>.
Anonymitaet edited a comment on pull request #8992:
URL: https://github.com/apache/pulsar/pull/8992#issuecomment-796385711


   @merlimat thanks for your great work. Would you like to add docs accordingly? Then I can help review, thanks
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on pull request #8992: PIP-68: WaitForExclusive producer access mode

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on pull request #8992:
URL: https://github.com/apache/pulsar/pull/8992#issuecomment-800064926


   @merlimat could you please take a look at the [draft](https://docs.google.com/document/d/1bjTZnHMC4PXV-nVh0wdlJSmntzkBS6fPX89VfrPIdt8/edit)? Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet edited a comment on pull request #8992: PIP-68: WaitForExclusive producer access mode

Posted by GitBox <gi...@apache.org>.
Anonymitaet edited a comment on pull request #8992:
URL: https://github.com/apache/pulsar/pull/8992#issuecomment-799055572


   @merlimat we are adding docs, should we create a new section named "Access mode" under the [Producer](https://pulsar.apache.org/docs/en/next/concepts-messaging/#producers) chapter and add the `WaitForExclusive` mode there? Thanks for your answer.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #8992: PIP-68: WaitForExclusive producer access mode

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #8992:
URL: https://github.com/apache/pulsar/pull/8992#discussion_r546212769



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
##########
@@ -468,6 +469,19 @@ protected void handleProducerSuccess(CommandProducerSuccess success) {
                     success.getRequestId(), success.getProducerName());
         }
         long requestId = success.getRequestId();
+        if (!success.getProducerReady()) {
+            // We got a success operation but the producer is not ready. This means that the producer has been queued up
+            // in broker. We need to leave the future pending until we get the final confirmation. We just mark that
+            // we have received a response, in order to avoid the timeout.
+            TimedCompletableFuture<?> requestFuture = (TimedCompletableFuture<?>) pendingRequests.get(requestId);
+            if (requestFuture != null) {
+                log.info("{} Producer {} has been queued up at broker. request: {}", ctx.channel(),
+                        success.getProducerName(), requestId);
+                requestFuture.markAsResponded();

Review comment:
       Good 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wuzhanpeng commented on a change in pull request #8992: PIP-68: WaitForExclusive producer access mode

Posted by GitBox <gi...@apache.org>.
wuzhanpeng commented on a change in pull request #8992:
URL: https://github.com/apache/pulsar/pull/8992#discussion_r708806589



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -337,14 +342,15 @@ public String getReplicatorPrefix() {
     }
 
     @Override
-    public CompletableFuture<Optional<Long>> addProducer(Producer producer) {
+    public CompletableFuture<Optional<Long>> addProducer(Producer producer,
+            CompletableFuture<Void> producerQueuedFuture) {
         checkArgument(producer.getTopic() == this);
 
         CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
 
-        incrementTopicEpochIfNeeded(producer)
-                .thenAccept(epoch -> {
-                    lock.readLock().lock();

Review comment:
       Hi @merlimat @sijie , this place makes me a little puzzled and I would like to ask why we need to change readlock to writelock?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet edited a comment on pull request #8992: PIP-68: WaitForExclusive producer access mode

Posted by GitBox <gi...@apache.org>.
Anonymitaet edited a comment on pull request #8992:
URL: https://github.com/apache/pulsar/pull/8992#issuecomment-799055572


   @merlimat we are planning to add docs, should we create a new section named "Access mode" under the [Producer](https://pulsar.apache.org/docs/en/next/concepts-messaging/#producers) chapter and add the `WaitForExclusive` mode there? Thanks for your answer.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat merged pull request #8992: PIP-68: WaitForExclusive producer access mode

Posted by GitBox <gi...@apache.org>.
merlimat merged pull request #8992:
URL: https://github.com/apache/pulsar/pull/8992


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org