You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/06/05 01:40:00 UTC

[GitHub] [pulsar] leiless opened a new issue #10832: [Question] Can producer send() cause cumulative ACK from previous sendAsync()?

leiless opened a new issue #10832:
URL: https://github.com/apache/pulsar/issues/10832


   ## `send()`
   
   > `MessageId send(T message) throws PulsarClientException`
   > Sends a message.
   > This call will be blocking until is successfully **acknowledged** by the Pulsar broker.
   > ...
   > **Returns**:
   > the message id assigned to the published message
   > ...
   > https://pulsar.apache.org/api/client/org/apache/pulsar/client/api/Producer.html#send-T-
   
   ## `sendAsync()`
   > `CompletableFuture<MessageId> sendAsync(T message)`
   > Send a message asynchronously
   > When the producer queue is full, by default this method will complete the future with an exception `PulsarClientException.ProducerQueueIsFullError`
   > ...
   > **Returns**:
   > a future that can be used to track when the message will have been safely persisted
   > https://pulsar.apache.org/api/client/org/apache/pulsar/client/api/Producer.html#sendAsync-T-
   
   Hi, Pulsar developers, my doubt is, if I have `N` messages to be sent by the producer.
   For the first `[0, N-1]` messages, I use `sendAsync()` and ignore the future result(`CompletableFuture<MessageId>`).
   And for the last message, i.e., the `N`-th message, I use `send()` to ensure `N`-th message is successfully **acknowledged**(persisted in the disk) by the Pulsar broker.
   
   My question is, does the `N`-th message ACKed from the broker imply that its proceedings(i.e. `[0, N-1]` messages) also ACKed(persisted in the disk) by the broker? since the message queue order has to be remaining FIFO.
   
   My config:
   * One producer
   * One subscription in exclusive mode
   * One consumer


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on issue #10832: [Question] Can producer send() cause cumulative ACK from previous sendAsync()?

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on issue #10832:
URL: https://github.com/apache/pulsar/issues/10832#issuecomment-1058889638


   The issue had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] leiless commented on issue #10832: [Question] Can producer send() cause cumulative ACK from previous sendAsync()?

Posted by GitBox <gi...@apache.org>.
leiless commented on issue #10832:
URL: https://github.com/apache/pulsar/issues/10832#issuecomment-858341525


   Code example
   
   ```java
           List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
           for (int i = 0; i < numMsgs; i++) {
               byte[] message = ("msg-" + i).getBytes();
               sendFutureList.add(producer.sendAsync(message));
               if ((i + 1) % numMsgsInBatch == 0) {
                   producer.flush();
                   LOG.info("Flush {} messages", (i + 1));
               }
           }
           FutureUtil.waitForAll(sendFutureList).get();
   ```
   
   https://github.com/apache/pulsar/blob/master/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java#L340-L349


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] leiless edited a comment on issue #10832: [Question] Can producer send() cause cumulative ACK from previous sendAsync()?

Posted by GitBox <gi...@apache.org>.
leiless edited a comment on issue #10832:
URL: https://github.com/apache/pulsar/issues/10832#issuecomment-856694634


   @lhotari, many thanks for your reply.
   
   For the [`flush()`](https://pulsar.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/Producer.html#flush--) method, my use case turns out to be:
   * Producer `.maxPendingMessages(10_000)`
   * Producer `.blockIfQueueFull(true)`
   * I have many messages to be sent, publish throughput `>= 10_000 msgs/s`
   * Use `sendAsync()` for the first `[0, N-1] msgs` and **still** forget the `CompletableFuture<MessageId>`
   * **[NEW]** Use `flush()` for the last messages, (i.e. the `N`-th message) to ensure the whole `N` messages persisted in the Pulsar broker.
   
   My question is, is the above solution is fine to use? or does it conform best practice in such heavy load?
   The biggest problem is that I read bulk data from an infinite stream, which is very fast(`>= 60_000 items/s`), and then send those data into Pulsar broker.
   If I use `send()`, the publish throughput can be extermely degraded.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] leiless commented on issue #10832: [Question] Can producer send() cause cumulative ACK from previous sendAsync()?

Posted by GitBox <gi...@apache.org>.
leiless commented on issue #10832:
URL: https://github.com/apache/pulsar/issues/10832#issuecomment-857300418


   > I believe that with `.blockIfQueueFull(true)` the chances for ordering problems is not a big concern.
   
   According to docs of [`flush()`](https://pulsar.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/Producer.html#flush--):
   
   > Flush all the messages **buffered in the client** and wait until all messages have been successfully persisted.
   
   If a message already failed, it won't be in the buffered list since it has been handled? so if you check the `CompletableFuture` and retry send after `flush()`, the order might be broken?
   
   Since previously sent messages already persisted in Pulsar Broker, if there's any subscription, those sent messages may have been consumed and ACK-ed by the consumer. (assume `exclusive` mode)
   And then our retry succeeds(persisted in Pulsar broker), the order seems broken?
   
   One thing worth mention:
   I've set `sequenceId()` for each message, in my use case, the `seqId` is monotonically increasing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on issue #10832: [Question] Can producer send() cause cumulative ACK from previous sendAsync()?

Posted by GitBox <gi...@apache.org>.
lhotari commented on issue #10832:
URL: https://github.com/apache/pulsar/issues/10832#issuecomment-856716926


   > What if an async sent message failed, and for example, say, it's the N/2-th message, and I retry sent this message, eventually, does the consumer still can read those messages in order? Is there any chance to cause messages disorder?
   
   I believe that with `.blockIfQueueFull(true)` the chances for ordering problems is not a big concern. However there might not be a guarantee that message order is preserved if the results of sendAsync aren't checked before sending the next message. It's worth checking with tests what your experiences are.
   With the default setting of `.blockIfQueueFull(false)` and using `sendAsync`, there's a real challenge how to handle the queue full exceptions which are used as a flow control method.
   
   > And why we need to ensure after flush()? you mean it flushes all backed in-memory messages safely persisted in Pulsar broker? and for those failed messages, it won't in the buffered to-be-sent list, so you have to retry for it?
   
   Flush can succeed regardless of any possible previous failures in message sending. The only way to ensure that the messages have been sent is to check the results. This could happen after calling flush().
   
   Perhaps there are better ways to ensure message delivery with batches when using transactions with Pulsar. However I haven't tried that myself.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] leiless edited a comment on issue #10832: [Question] Can producer send() cause cumulative ACK from previous sendAsync()?

Posted by GitBox <gi...@apache.org>.
leiless edited a comment on issue #10832:
URL: https://github.com/apache/pulsar/issues/10832#issuecomment-856702215


   > One possibility is to add them to a map/list and validate that all completable futures were successfully completed as the last step, after your `flush()` step.
   
   What if an async sent message failed, and for example, say, it's the `N/2`-th message, and I retry sent this message, eventually, does the consumer still can read those messages in order? Is there any chance to cause messages disorder?
   
   And why we need to ensure after `flush()`? you mean it flushes all backed in-memory messages safely persisted in Pulsar broker? and for those failed messages, it won't in the buffered to-be-sent list, so you have to retry for it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] leiless commented on issue #10832: [Question] Can producer send() cause cumulative ACK from previous sendAsync()?

Posted by GitBox <gi...@apache.org>.
leiless commented on issue #10832:
URL: https://github.com/apache/pulsar/issues/10832#issuecomment-856702215


   > One possibility is to add them to a map/list and validate that all completable futures were successfully completed as the last step, after your `flush()` step.
   
   What if an async sent message failed, and for example, say, it's the `N/2`-th message, and I retry sent this message, eventually, does the consumer still can read those messages in order?
   
   And why we need to ensure after `flush()`? you mean it flushes all backed in-memory messages safely persisted in Pulsar broker? and for those failed messages, it won't in the buffered to-be-sent list, so you have to retry for it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari edited a comment on issue #10832: [Question] Can producer send() cause cumulative ACK from previous sendAsync()?

Posted by GitBox <gi...@apache.org>.
lhotari edited a comment on issue #10832:
URL: https://github.com/apache/pulsar/issues/10832#issuecomment-856677788


   I believe that the Producer [flush](https://pulsar.apache.org/api/client/2.7.0-SNAPSHOT/) and [flushAsync](https://pulsar.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/Producer.html#flushAsync--) methods exist for the use case that you have described.
   
   When using the async API one must ensure the CompletableFutures returned from `sendAsync` are handled for the case of errors since the call to `flushAsync` will be successful also in the case that one of the messages fails.
   
   Another challenge is the producer flow control (back pressure). By default the future returned from sendAsync will complete with an exception when the producer queue is full. It's possible to change this behavior in the producer settings in the `ProducerBuilder` with `.blockIfQueueFull(true)`, but this has a downside of making sendAsync a blocking operation when the queue is full. For some solutions this might be fine. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on issue #10832: [Question] Can producer send() cause cumulative ACK from previous sendAsync()?

Posted by GitBox <gi...@apache.org>.
lhotari commented on issue #10832:
URL: https://github.com/apache/pulsar/issues/10832#issuecomment-856697243


   > My question is, is the above solution is fine to use?
   
   If you don't care if message sending fails, it's fine to do so. If you want to ensure the delivery of sent messages, you cannot ignore the CompletableFuture<MessageId> instances returned from `sendAsync`. One possibility is to add them to a map/list and validate that all completable futures were successfully completed as the last step, after your `flush()` step.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on issue #10832: [Question] Can producer send() cause cumulative ACK from previous sendAsync()?

Posted by GitBox <gi...@apache.org>.
lhotari commented on issue #10832:
URL: https://github.com/apache/pulsar/issues/10832#issuecomment-856677788


   I believe that the Producer [flush](https://pulsar.apache.org/api/client/2.7.0-SNAPSHOT/) and [flushAsync](https://pulsar.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/Producer.html#flushAsync--) methods exist for the use case that you have described.
   
   When using the async API one must ensure the CompletableFutures returned from `sendAsync` are handled for the case of errors since the call to `flushAsync` will be successful also in the case that one of the messages fails.
   
   Another challenge is the producer flow control (back pressure). By default the future returned from sendAsync will complete with an exception when the producer queue is full. It's possible to change this behavior in the producer settings in the `ProducerBuilder` with `.blockIfQueueFull(true)`, but this has a downside of making sendAsync a blocking operation. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] leiless commented on issue #10832: [Question] Can producer send() cause cumulative ACK from previous sendAsync()?

Posted by GitBox <gi...@apache.org>.
leiless commented on issue #10832:
URL: https://github.com/apache/pulsar/issues/10832#issuecomment-855166878


   Instead of `fsync(2)` per message manner, I pursuing `fsync(2)` in batch(as a whole).
   Explicitly, `fsync(2)` for the last msg in a list.
   Which enforces previous messages to be persisted in the disk?
   
   (Roughly speaking, you can treat `fsync(2)` as ACK?)
   
   ```
       Before:
       
           +-------+-------+-------+-------+
           | Msg N |  ...  | Msg 2 | Msg 1 | -> Head up
           | fsync | fsync | fsync | fsync |
           +-------+-------+-------+-------+
       
       After:
           +-------+-------+-------+-------+
           | Msg N |  ...  | Msg 2 | Msg 1 | -> Head up
           |*fsync*|       |       |       |
           +-------+-------+-------+-------+
   ```
   
   The idea is somewhat like [Consumer.acknowledgeCumulative(MessageId)](https://pulsar.apache.org/api/client/org/apache/pulsar/client/api/Consumer.html#acknowledgeCumulative-org.apache.pulsar.client.api.MessageId-) but for Producer side.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] leiless edited a comment on issue #10832: [Question] Can producer send() cause cumulative ACK from previous sendAsync()?

Posted by GitBox <gi...@apache.org>.
leiless edited a comment on issue #10832:
URL: https://github.com/apache/pulsar/issues/10832#issuecomment-858341525


   Code example
   
   ```java
           List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
           for (int i = 0; i < numMsgs; i++) {
               byte[] message = ("msg-" + i).getBytes();
               sendFutureList.add(producer.sendAsync(message));
               if ((i + 1) % numMsgsInBatch == 0) {
                   producer.flush();
                   LOG.info("Flush {} messages", (i + 1));
               }
           }
           FutureUtil.waitForAll(sendFutureList).get();
   ```
   
   https://github.com/apache/pulsar/blob/master/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java#L340-L349
   https://github.com/apache/pulsar/blob/master/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java#L36-L44


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] leiless commented on issue #10832: [Question] Can producer send() cause cumulative ACK from previous sendAsync()?

Posted by GitBox <gi...@apache.org>.
leiless commented on issue #10832:
URL: https://github.com/apache/pulsar/issues/10832#issuecomment-856694634


   @lhotari, many thanks for your reply.
   
   For the [`flush()`](https://pulsar.apache.org/api/client/2.7.0-SNAPSHOT/) method, my use case turns out to be:
   * Producer `.maxPendingMessages(10_000)`
   * Producer `.blockIfQueueFull(true)`
   * I have many messages to be sent, publish throughput `>= 10_000 msgs/s`
   * Use `sendAsync()` for the first `[0, N-1] msgs` and **still** forget the `CompletableFuture<MessageId>`
   * **[NEW]** Use `flush()` for the last messages, (i.e. the `N`-th message) to ensure the whole `N` messages persisted in the Pulsar broker.
   
   My question is, is the above solution is fine to use?
   The biggest problem is that I read bulk data from an infinite stream, which is very fast(`>= 60_000 items/s`), and then send those data into Pulsar broker.
   If I use `send()`, the publish throughput can be extermely degraded.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari edited a comment on issue #10832: [Question] Can producer send() cause cumulative ACK from previous sendAsync()?

Posted by GitBox <gi...@apache.org>.
lhotari edited a comment on issue #10832:
URL: https://github.com/apache/pulsar/issues/10832#issuecomment-856716926


   > What if an async sent message failed, and for example, say, it's the N/2-th message, and I retry sent this message, eventually, does the consumer still can read those messages in order? Is there any chance to cause messages disorder?
   
   I believe that with `.blockIfQueueFull(true)` the chances for ordering problems is not a big concern. However there might not be a guarantee that message order is preserved if the results of sendAsync aren't checked before sending the next message. It's worth checking with tests what your experiences are.
   With the default setting of `.blockIfQueueFull(false)` and using `sendAsync`, there's a real challenge how to handle the queue full exceptions which are used as a flow control method.
   
   > And why we need to ensure after flush()? you mean it flushes all backed in-memory messages safely persisted in Pulsar broker? and for those failed messages, it won't in the buffered to-be-sent list, so you have to retry for it?
   
   Flush can succeed regardless of any possible previous failures in message sending. The only way to ensure that the messages have been sent is to check the results. Checking the CompletableFuture instances could happen after calling flush().
   
   Perhaps there are better ways to ensure message delivery with batches when using transactions with Pulsar. However I haven't tried that myself.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] leiless edited a comment on issue #10832: [Question] Can producer send() cause cumulative ACK from previous sendAsync()?

Posted by GitBox <gi...@apache.org>.
leiless edited a comment on issue #10832:
URL: https://github.com/apache/pulsar/issues/10832#issuecomment-856694634


   @lhotari, many thanks for your reply.
   
   For the [`flush()`](https://pulsar.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/Producer.html#flush--) method, my use case turns out to be:
   * Producer `.maxPendingMessages(10_000)`
   * Producer `.blockIfQueueFull(true)`
   * I have many messages to be sent, publish throughput `>= 10_000 msgs/s`
   * Use `sendAsync()` for the first `[0, N-1] msgs` and **still** forget the `CompletableFuture<MessageId>`
   * **[NEW]** Use `sendAsync()` & `flush()` for the last messages, (i.e. the `N`-th message) to ensure the whole `N` messages persisted in the Pulsar broker.
   
   My question is, is the above solution is fine to use? or does it conform best practice in such heavy load?
   The biggest problem is that I read bulk data from an infinite stream, which is very fast(`>= 60_000 items/s`), and then send those data into Pulsar broker.
   If I use `send()` for each message, the publish throughput can be extermely degraded.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] leiless edited a comment on issue #10832: [Question] Can producer send() cause cumulative ACK from previous sendAsync()?

Posted by GitBox <gi...@apache.org>.
leiless edited a comment on issue #10832:
URL: https://github.com/apache/pulsar/issues/10832#issuecomment-855166878


   Instead of `fsync(2)` per message manner, I pursuing `fsync(2)` in batch(as a whole).
   Explicitly, `fsync(2)` for the last msg in a list.
   Which enforces previous messages to be persisted in the disk?
   
   (Roughly speaking, you can treat `fsync(2)` as ACK?)
   
   ```
       Before:
       
           +-------+-------+-------+-------+
           | Msg N |  ...  | Msg 2 | Msg 1 | -> Head up
           | fsync | fsync | fsync | fsync |
           +-------+-------+-------+-------+
       
       After:
   
           +-------+-------+-------+-------+
           | Msg N |  ...  | Msg 2 | Msg 1 | -> Head up
           |*fsync*|       |       |       |
           +-------+-------+-------+-------+
   ```
   
   The idea is somewhat like [Consumer.acknowledgeCumulative(MessageId)](https://pulsar.apache.org/api/client/org/apache/pulsar/client/api/Consumer.html#acknowledgeCumulative-org.apache.pulsar.client.api.MessageId-) but for Producer side.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] leiless edited a comment on issue #10832: [Question] Can producer send() cause cumulative ACK from previous sendAsync()?

Posted by GitBox <gi...@apache.org>.
leiless edited a comment on issue #10832:
URL: https://github.com/apache/pulsar/issues/10832#issuecomment-856694634


   @lhotari, many thanks for your reply.
   
   For the [`flush()`](https://pulsar.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/Producer.html#flush--) method, my use case turns out to be:
   * Producer `.maxPendingMessages(10_000)`
   * Producer `.blockIfQueueFull(true)`
   * I have many messages to be sent, publish throughput `>= 10_000 msgs/s`
   * Use `sendAsync()` for the first `[0, N-1] msgs` and **still** forget the `CompletableFuture<MessageId>`
   * **[NEW]** Use `flush()` for the last messages, (i.e. the `N`-th message) to ensure the whole `N` messages persisted in the Pulsar broker.
   
   My question is, is the above solution is fine to use?
   The biggest problem is that I read bulk data from an infinite stream, which is very fast(`>= 60_000 items/s`), and then send those data into Pulsar broker.
   If I use `send()`, the publish throughput can be extermely degraded.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org