You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/04/10 18:14:09 UTC

[GitHub] [pulsar] horizonzy opened a new pull request, #15107: [fix] [client-consumer] Fix message discard problem when pending receives all done.

horizonzy opened a new pull request, #15107:
URL: https://github.com/apache/pulsar/pull/15107

   Fixes #15106
   
   ### Motivation
   Fix message discard problem when  pending receives all done
   
   ### Documentation
     
   - [ ] `no-need-doc` 
   (Please explain why)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15107: [fix] [client-consumer] Fix message discard problem when pending receives all done.

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15107:
URL: https://github.com/apache/pulsar/pull/15107#discussion_r846916819


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1419,40 +1428,38 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m
      * Notify waiting asyncReceive request with the received message.
      *
      * @param message
+     * @return the message is enqueue incomingMessages.
      */
-    void notifyPendingReceivedCallback(final Message<T> message, Exception exception) {
-        if (pendingReceives.isEmpty()) {
-            return;
-        }
-
+    boolean notifyPendingReceivedCallback(final Message<T> message, Exception exception) {
         // fetch receivedCallback from queue
         final CompletableFuture<Message<T>> receivedFuture = nextPendingReceive();
         if (receivedFuture == null) {
-            return;
+            return message != null && getCurrentReceiverQueueSize() != 0;

Review Comment:
   Maybe not. if CurrentReceiverQueueSize == 0, the message can't enqueue `incoming msg` queue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #15107: [fix] [client-consumer] Fix message discard problem when pending receives all done.

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #15107:
URL: https://github.com/apache/pulsar/pull/15107#discussion_r846905545


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1419,40 +1428,38 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m
      * Notify waiting asyncReceive request with the received message.
      *
      * @param message
+     * @return the message is enqueue incomingMessages.
      */
-    void notifyPendingReceivedCallback(final Message<T> message, Exception exception) {
-        if (pendingReceives.isEmpty()) {
-            return;
-        }
-
+    boolean notifyPendingReceivedCallback(final Message<T> message, Exception exception) {
         // fetch receivedCallback from queue
         final CompletableFuture<Message<T>> receivedFuture = nextPendingReceive();
         if (receivedFuture == null) {
-            return;
+            return message != null && getCurrentReceiverQueueSize() != 0;

Review Comment:
   Can we just return true here? 



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1419,40 +1428,38 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m
      * Notify waiting asyncReceive request with the received message.
      *
      * @param message
+     * @return the message is enqueue incomingMessages.

Review Comment:
   Maybe it's better revert the logic, return true if this message is handled by pending receive.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15107: [fix] [client-consumer] Fix message discard problem when pending receives all done.

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15107:
URL: https://github.com/apache/pulsar/pull/15107#discussion_r846916989


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1419,40 +1428,38 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m
      * Notify waiting asyncReceive request with the received message.
      *
      * @param message
+     * @return the message is enqueue incomingMessages.

Review Comment:
   fine.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15107: [enhance] [client-consumer] Enhance robustness in consumer executeNotifyCallback.

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15107:
URL: https://github.com/apache/pulsar/pull/15107#discussion_r847889249


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1419,40 +1428,42 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m
      * Notify waiting asyncReceive request with the received message.
      *
      * @param message
+     * @return return true if this message is handled by pending receive.
      */
-    void notifyPendingReceivedCallback(final Message<T> message, Exception exception) {
-        if (pendingReceives.isEmpty()) {
-            return;
-        }
-
+    boolean notifyPendingReceivedCallback(final Message<T> message, Exception exception) {
         // fetch receivedCallback from queue
         final CompletableFuture<Message<T>> receivedFuture = nextPendingReceive();
         if (receivedFuture == null) {
-            return;
+            if (exception != null) {
+                log.warn("Received exception but no future to callback.", exception);
+                return true;
+            }
+            return getCurrentReceiverQueueSize() == 0;

Review Comment:
   got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15107: [fix] [client-consumer] Fix message discard problem when pending receives all done.

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15107:
URL: https://github.com/apache/pulsar/pull/15107#discussion_r846822903


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1419,40 +1428,38 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m
      * Notify waiting asyncReceive request with the received message.
      *
      * @param message
+     * @return the message is enqueue incomingMessages.
      */
-    void notifyPendingReceivedCallback(final Message<T> message, Exception exception) {
-        if (pendingReceives.isEmpty()) {

Review Comment:
   here remove it to avoid thread race.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy closed pull request #15107: [enhance] [client-consumer] Enhance robustness in consumer executeNotifyCallback.

Posted by GitBox <gi...@apache.org>.
horizonzy closed pull request #15107: [enhance] [client-consumer] Enhance robustness in consumer executeNotifyCallback.
URL: https://github.com/apache/pulsar/pull/15107


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on pull request #15107: [fix] [client-consumer] Fix message discard problem when pending receives all done.

Posted by GitBox <gi...@apache.org>.
horizonzy commented on PR #15107:
URL: https://github.com/apache/pulsar/pull/15107#issuecomment-1095806601

   > @horizonzy - thanks for your contribution. Is this for a bug that you observed? If so, which version of the Pulsar client were you using? Based on my understanding of the client, the race that you are seeking to fix isn't possible because the `pendingReceives` queue is only supposed to be updated from the `internalPinnedExecutor`, which prevents data races.
   
   No, I did't found a clear bug here. I just see the code potential risks. So enhance it, sry, maybe my title is misleading. I change the title.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #15107: [fix] [client-consumer] Fix message discard problem when pending receives all done.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15107:
URL: https://github.com/apache/pulsar/pull/15107#issuecomment-1094339977

   @horizonzy:Thanks for your contribution. For this PR, do we need to update docs?
   (The [PR template contains info about doc](https://github.com/apache/pulsar/blob/master/.github/PULL_REQUEST_TEMPLATE.md#documentation), which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15107: [fix] [client-consumer] Fix message discard problem when pending receives all done.

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15107:
URL: https://github.com/apache/pulsar/pull/15107#discussion_r846916819


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1419,40 +1428,38 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m
      * Notify waiting asyncReceive request with the received message.
      *
      * @param message
+     * @return the message is enqueue incomingMessages.
      */
-    void notifyPendingReceivedCallback(final Message<T> message, Exception exception) {
-        if (pendingReceives.isEmpty()) {
-            return;
-        }
-
+    boolean notifyPendingReceivedCallback(final Message<T> message, Exception exception) {
         // fetch receivedCallback from queue
         final CompletableFuture<Message<T>> receivedFuture = nextPendingReceive();
         if (receivedFuture == null) {
-            return;
+            return message != null && getCurrentReceiverQueueSize() != 0;

Review Comment:
   Maybe not. if CurrentReceiverQueueSize == 0, the message can't enqueue incoming msg.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on pull request #15107: [enhance] [client-consumer] Enhance robustness in consumer executeNotifyCallback.

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on PR #15107:
URL: https://github.com/apache/pulsar/pull/15107#issuecomment-1097037259

   @horizonzy - given that there is no race because the use of a single thread for updating `pendingRecieves` is intentionally part of this class's design, I don't think there is any need to change the current code. Do you agree?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #15107: [fix] [client-consumer] Fix message discard problem when pending receives all done.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15107:
URL: https://github.com/apache/pulsar/pull/15107#issuecomment-1094339991

   @horizonzy:Thanks for providing doc info!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15107: [fix] [client-consumer] Fix message discard problem when pending receives all done.

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15107:
URL: https://github.com/apache/pulsar/pull/15107#discussion_r847870425


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1419,40 +1428,38 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m
      * Notify waiting asyncReceive request with the received message.
      *
      * @param message
+     * @return the message is enqueue incomingMessages.
      */
-    void notifyPendingReceivedCallback(final Message<T> message, Exception exception) {
-        if (pendingReceives.isEmpty()) {

Review Comment:
   Hi, I just review the code and felt that the judgment in this place might be unnecessary, introducing the risk of thread races. I double checked, it's single scheduled executor, not cause thread race.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #15107: [fix] [client-consumer] Fix message discard problem when pending receives all done.

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on code in PR #15107:
URL: https://github.com/apache/pulsar/pull/15107#discussion_r847777124


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1155,16 +1156,24 @@ protected <V> MessageImpl<V> newMessage(final MessageIdImpl messageId,
     }
 
     private void executeNotifyCallback(final MessageImpl<T> message) {
+        internalPinnedExecutor.execute(() -> {
+            executeNotifyCallback0(message);
+        });
+    }
+
+    @VisibleForTesting
+    void executeNotifyCallback0(final MessageImpl<T> message) {

Review Comment:
   Instead of exposing this internal method for a test, are you able to create a funcational test that verifies this edge case?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1419,40 +1428,38 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m
      * Notify waiting asyncReceive request with the received message.
      *
      * @param message
+     * @return the message is enqueue incomingMessages.
      */
-    void notifyPendingReceivedCallback(final Message<T> message, Exception exception) {
-        if (pendingReceives.isEmpty()) {

Review Comment:
   Can you clarify which race you're referencing? If you're referring to the implementation of `hasNextPendingReceive()`, `pendingReceives` is only supposed to be updated on the `internalPinnedExecutor`, so the check is redundant, but is not necessarily a race. (We can still remove the check, but I just want to see if there is another race you noticed.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #15107: [enhance] [client-consumer] Enhance robustness in consumer executeNotifyCallback.

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #15107:
URL: https://github.com/apache/pulsar/pull/15107#discussion_r847877905


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1419,40 +1428,42 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m
      * Notify waiting asyncReceive request with the received message.
      *
      * @param message
+     * @return return true if this message is handled by pending receive.
      */
-    void notifyPendingReceivedCallback(final Message<T> message, Exception exception) {
-        if (pendingReceives.isEmpty()) {
-            return;
-        }
-
+    boolean notifyPendingReceivedCallback(final Message<T> message, Exception exception) {
         // fetch receivedCallback from queue
         final CompletableFuture<Message<T>> receivedFuture = nextPendingReceive();
         if (receivedFuture == null) {
-            return;
+            if (exception != null) {
+                log.warn("Received exception but no future to callback.", exception);
+                return true;
+            }
+            return getCurrentReceiverQueueSize() == 0;

Review Comment:
   From the semantic of method, this condition should not be handle in this method.
   And for `ZeroQueueConsumerImpl` only one message is allowed into incoming queueu.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on pull request #15107: [enhance] [client-consumer] Enhance robustness in consumer executeNotifyCallback.

Posted by GitBox <gi...@apache.org>.
horizonzy commented on PR #15107:
URL: https://github.com/apache/pulsar/pull/15107#issuecomment-1097465147

   Thanks all reviewd, I got it. This enhancement looks unnecessary, close it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org