You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/04/15 07:05:46 UTC

[GitHub] [pulsar] linlinnn opened a new pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

linlinnn opened a new pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240


   Fixes #10222
   Fixes #10173
   
   ### Motivation
   fix order guarantee for MultiTopicsConsumerImpl
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai commented on pull request #10240: [consumer] Revert "Remove consumer unnecessary locks (#9261)"

Posted by GitBox <gi...@apache.org>.
MarvinCai commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826670555


   But we should still try to improve the performance w/ a lock free solution in a separate issue ASAP.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn edited a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826027339


   > And we can also try to notify pending receive after we put a request into pending request queue if incomingMessage is not empty.
   
   How do you want to deal with this point? 
   I only think about this like heartbeat.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai edited a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
MarvinCai edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-821810264


   locking preventing a piece of code from being executed by different thread at the same time, but IMO the root cause is not concurrent execution. It's a piece of code get executed by different thread at different time while due to scheduling the result is non- deterministic, to fix that we have to make the code run on only one thread.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui edited a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
codelipenghui edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-820260465


   @linlinnn I have some doubts here, why we need to guarantee message order across partitions(The MultiTopicsConsumerImpl is consuming messages from multiple partitions)? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-821819223


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn edited a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-820786672


   @MarvinCai 
   poll method is not a block method, it will return the head of this queue, or null if this queue is empty
   `pulsar-external-listener` execute [receivedFuture.complete(message)](https://github.com/apache/pulsar/blob/3e5fbcea4c424302cfd258d1238d2135eca2a555/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L235)
   and then the message is accepted by `pulsar-client-internal` to do final process. 
   
   Also, I see the similar lock in the method batch receive.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn edited a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-821811201


   @MarvinCai
   I see the message result accepted by `pulsar-client-internal`, and the `pulsar-external-listener` only help to complete the future.
   And I remove the lock, please take another look, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
MarvinCai commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826028389


   > > And we can also try to notify pending receive after we put a request into pending request queue if incomingMessage is not empty.
   > 
   > How do you want to deal with this point?
   > I only think about solution like heartbeat.
   
   `notifyPendingReceive` can probably recursively invoke itself or submit itself to an executor when both incomingQueue and pendingReceiveRequest are not empty? 
   every time it's called the first message from incomingMessage will be polled and passed to first pending receive request, (if either message or request is null then it'll just return, so no message or pending request will be dropped) this can always be executed by a single thread so order can be guaranteed. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826119480


   Another solution https://github.com/apache/pulsar/pull/10352
   But I haven't tested the performance


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#discussion_r615524721



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1050,15 +1043,15 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ac
 
             // Enqueue the message so that it can be retrieved when application calls receive()
             // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
-            // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
             if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
                     redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
                 possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
                         Collections.singletonList(message));
             }
+            enqueueMessage(message);
             if (peekPendingReceive() != null) {
-                notifyPendingReceivedCallback(message, null);
-            } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
+                notifyPendingReceivedCallback(incomingMessages.poll(), null);

Review comment:
       If it is to be lock-free, I suggest to use executorProvider, the logic of the same consumerId will be processed in the same thread. Then compare the impact of performance.
   
   If you want to lock, we only need to ensure that the thread safety of these two scenarios can reduce the scope of the lock.
   1) When the poll comes out and the message is empty, put it into the pendingQueue
   2) Judge the pendingQueue is empty, enter enqueueMessageAndCheckBatchReceive
   
   Your current logic still has thread safety issues. If only one thread is consuming, it may cause a message in the queue to never be consumed, and there is also a pending future in the pendingQueue. This situation will occur in the scene of the last message.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#discussion_r615524721



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1050,15 +1043,15 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ac
 
             // Enqueue the message so that it can be retrieved when application calls receive()
             // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
-            // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
             if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
                     redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
                 possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
                         Collections.singletonList(message));
             }
+            enqueueMessage(message);
             if (peekPendingReceive() != null) {
-                notifyPendingReceivedCallback(message, null);
-            } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
+                notifyPendingReceivedCallback(incomingMessages.poll(), null);

Review comment:
       If it is to be lock-free, I suggest to use orderedExecutor, the logic of  same consumerId will be processed in the same thread. Then compare the impact of performance.
   
   If you want to lock, we only need to ensure that the thread safety of these two scenarios can reduce the scope of the lock.
   1) When the poll comes out and the message is empty, put it into the pendingQueue
   2) Judge the pendingQueue is empty, enter enqueueMessageAndCheckBatchReceive
   
   Your current logic still has thread safety issues. If only one thread is consuming, it may cause a message in the queue to never be consumed, and there is also a pending future in the pendingQueue. This situation will occur in the scene of the last message.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-821774944


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn edited a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826029000


   @MarvinCai 
   Sorry, I do not get your point.
   In the scene of the last message(there is no more message come to invoke notifyPendingReceive)
   IMO, this is a trade-off concerns, if we add lock, we will lose 45% performance, I prefer to add a monitor thread to deal with the boundary case even though making the receiving of last message less timely sometimes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
MarvinCai commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-820765187


   I actually don't see the point of the locking we try to add here. The poll from receiver queue or notify pending request is not the root cause here.  The BlockingQueue is itself threadsafe. And no matter which thread does the notification, it'll alway be thread from "pulsar-external-listener" to process pending request.
   The root cause is when message is retrieved in the [line](https://github.com/apache/pulsar/blob/3e5fbcea4c424302cfd258d1238d2135eca2a555/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L245), the receiveAsync can be completed from 2 different thread in different situation, if there are messages in receiver queue, it'll be completed from "pulsar-client-internal" thread, if it become pending request, it'll later be completed from "pulsar-external-listener" thread. Even if order of message arrive the underlying single ConsumerImpl is correct, the MultiTopicsConsumerImpl can see out of order message cause 2 thread are processing message independently. 
   This only happen with multitopic consumer in case of small message volume without proper batching like in out test case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#discussion_r615524721



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1050,15 +1043,15 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ac
 
             // Enqueue the message so that it can be retrieved when application calls receive()
             // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
-            // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
             if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
                     redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
                 possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
                         Collections.singletonList(message));
             }
+            enqueueMessage(message);
             if (peekPendingReceive() != null) {
-                notifyPendingReceivedCallback(message, null);
-            } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
+                notifyPendingReceivedCallback(incomingMessages.poll(), null);

Review comment:
       If it is to be lock-free, I suggest to use orderedExecutor, the logic of the same consumerId will be processed in the same thread. Then compare the impact of performance.
   
   If you want to lock, we only need to ensure that the thread safety of these two scenarios can reduce the scope of the lock.
   1) When the poll comes out and the message is empty, put it into the pendingQueue
   2) Judge the pendingQueue is empty, enter enqueueMessageAndCheckBatchReceive
   
   Your current logic still has thread safety issues. If only one thread is consuming, it may cause a message in the queue to never be consumed, and there is also a pending future in the pendingQueue. This situation will occur in the scene of the last message.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn edited a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826027339


   > And we can also try to notify pending receive after we put a request into pending request queue if incomingMessage is not empty.
   
   How do you want to deal with this point? 
   I only think about solution like heartbeat.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] Revert "Remove consumer unnecessary locks (#9261)"

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826483184


   @codelipenghui @315157973 I rebase master and revert #9261, PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn removed a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn removed a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-820294725






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn edited a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826029000


   @MarvinCai 
   Sorry, I do not get your point.
   In the scene of the last message(there is no more message come to invoke notifyPendingReceive)
   IMO, this is a trade-off concerns, if we add lock, we will lose 45% performance, I prefer to add a monitor thread to deal with the boundary case even though makeing the receiving of messages less timely.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #10240: [consumer] Revert "Remove consumer unnecessary locks (#9261)"

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-827360587


   > But we should still try to improve the performance w/ a lock free solution in a separate issue ASAP.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826029000


   @MarvinCai 
   Sorry, I do not get your point.
   In the scene of the last message(there is no more message come to invoke notifyPendingReceive)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn removed a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn removed a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-822012430


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 edited a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
315157973 edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-820548537


   This is indeed a problem
   ```
           //1
          Message<T> message = incomingMessages.poll();
           if (message == null) {
            //2
               pendingReceives.add(result);
               cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
           } else {
            //3
               messageProcessed(message);
               result.complete(beforeConsume(message));
           }
   ```
   
   ```
           // 4
           if (peekPendingReceive() != null) {
               notifyPendingReceivedCallback(message, null);
           }
          ...
          // Message enters incomeQueue
   ```
   
   When the execution order is 1、4、2、1、3、4, the order of 3 and 4 cannot be guaranteed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#discussion_r616605021



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1050,15 +1053,23 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ac
 
             // Enqueue the message so that it can be retrieved when application calls receive()
             // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
-            // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
             if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
                     redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
                 possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
                         Collections.singletonList(message));
             }
-            if (peekPendingReceive() != null) {
-                notifyPendingReceivedCallback(message, null);
-            } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
+            CompletableFuture<Message<T>> pendingReceiveFuture;
+            lock.writeLock().lock();
+            try {
+                enqueueMessage(message);

Review comment:
       Now there is locked, we do not need to enqueue first, and can directly pass it to the future, which can reduce enqueue and dequeue.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on a change in pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on a change in pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#discussion_r615200762



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -407,21 +407,19 @@ public UnAckedMessageTracker getUnAckedMessageTracker() {
     protected CompletableFuture<Message<T>> internalReceiveAsync() {
         CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
         CompletableFuture<Message<T>> result = cancellationHandler.createFuture();
-        Message<T> message = null;
+
         try {
-            message = incomingMessages.poll(0, TimeUnit.MILLISECONDS);
+            reentrantLock.lock();

Review comment:
       remove lock




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-821996826


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-822426467


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10240: [consumer] Revert "Remove consumer unnecessary locks (#9261)"

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#discussion_r619972470



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -414,6 +414,7 @@ public UnAckedMessageTracker getUnAckedMessageTracker() {
         CompletableFuture<Message<T>> result = cancellationHandler.createFuture();
         Message<T> message = null;
         try {
+            lock.writeLock().lock();

Review comment:
       Please move out of the try block




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-820178858


   @MarvinCai @codelipenghui @merlimat  Please take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-820260465


   @linlinnn I have some doubts here, why we need to guarantee message order for multiple partitions(The MultiTopicsConsumerImpl is consuming messages from multiple partitions)? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
MarvinCai commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-821810264


   locking preventing a piece of code to be executed by different thread at the same time, but IMO the root cause is not concurrent execution. It's a piece of code get executed by different thread at different time while due to scheduling the result is non- deterministic, to fix that we have to make the code run on only one thread.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui edited a comment on pull request #10240: [consumer] Revert "Remove consumer unnecessary locks (#9261)"

Posted by GitBox <gi...@apache.org>.
codelipenghui edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-827360587


   > But we should still try to improve the performance w/ a lock free solution in a separate issue ASAP.
   
   @MarvinCai Yes, we should continue to improve the performance. @315157973 also works on the lock-free solution. We'd better to revert first and then find a better solution to handle this issue.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai edited a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
MarvinCai edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-821810264


   locking preventing a piece of code from being executed by different thread at the same time, but IMO the root cause is **not concurrent execution**. It's a piece of code get executed by different thread at different time while due to scheduling the result is non- deterministic, to fix that we have to make the code run on only one thread.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn removed a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn removed a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-821996826


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#discussion_r619098207



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1050,15 +1053,23 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ac
 
             // Enqueue the message so that it can be retrieved when application calls receive()
             // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
-            // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
             if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
                     redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
                 possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
                         Collections.singletonList(message));
             }
-            if (peekPendingReceive() != null) {
-                notifyPendingReceivedCallback(message, null);
-            } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
+            CompletableFuture<Message<T>> pendingReceiveFuture;
+            lock.writeLock().lock();
+            try {
+                enqueueMessage(message);

Review comment:
       I recommend rolling back directly to avoid unknown problems. Although the performance will drop by 45%.
   New changes, CR is hard to see the thread safety problem.
   https://github.com/apache/pulsar/pull/9261
   Of course, if you can do what @MarvinCai said is lock-free, so much the better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-821811201


   @MarvinCai
   I see the message result accepted by `pulsar-client-internal`, and the `pulsar-external-listener` help to complete the future.
   And I remove the lock, please take another look, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #10240: [consumer] Revert "Remove consumer unnecessary locks (#9261)"

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn removed a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn removed a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-822426467


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn edited a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-820444419


   > I think we should guarantee order by process incoming message from single thread, instead of adding lock which can impact performance due to context switch and wait.
   
   @MarvinCai 
   Emm. I agree your consideration. But the concurrent logic `notify` and `offer` seems conflict with single thread without lock.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
MarvinCai commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-824489484


   Place holder, will post some of my though tonight.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn removed a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn removed a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-822021850


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on a change in pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on a change in pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#discussion_r615273474



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
##########
@@ -267,4 +261,10 @@ public int choosePartition(Message<?> msg, TopicMetadata metadata) {
 
     }
 
+    private MessageRouter messageRouter = new MessageRouter() {

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-822021850


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn edited a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826027339


   > And we can also try to notify pending receive after we put a request into pending request queue if incomingMessage is not empty.
   
   How do you want to deal with this point? 
   I only think about this like heartheat.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-820293992


   > @linlinnn I have some doubts here, why we need to guarantee message order across partitions
   
   Yes, we don't need to guarantee message order across partitions, but we should guarantee message order from the same partition, The `MultiTopicsConsumerImpl` iterates `ConsumerImpl`(consuming messages from single partition) to consume messages from multiple topics and multiple partitions.
   `
   newConsumers.forEach(consumer -> {
                   consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(), conf.getReceiverQueueSize());
                   receiveMessageFromConsumer(consumer);
               });
   `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #10240: [consumer] Revert "Remove consumer unnecessary locks (#9261)"

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-827382633


   +1
   let's merge as soon as CI passes, then we can figure out which is the best approach to improve the consumer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#discussion_r615530138



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1050,15 +1043,15 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ac
 
             // Enqueue the message so that it can be retrieved when application calls receive()
             // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
-            // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
             if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
                     redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
                 possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
                         Collections.singletonList(message));
             }
+            enqueueMessage(message);
             if (peekPendingReceive() != null) {
-                notifyPendingReceivedCallback(message, null);
-            } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
+                notifyPendingReceivedCallback(incomingMessages.poll(), null);

Review comment:
       notify thread is thread-1
   Consumer Receive thread is thread-2
   
   1 thread-2 poll out of the message is empty
   2 thread-1 The message enters the queue and judges whether the pending is empty, and it is found to be empty
   3 thread-2 is put into pending future (not batch Receive)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn edited a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826023385


   > It may cause a message in the queue to never be consumed, and there is also a pending future in the pendingQueue. This situation will occur in the scene of the last message.
   notify thread is thread-1
   consumer receive thread is thread-2
   1 thread-2 poll out of the message is null
   2 thread-1 The message enters the queue and judges whether the pending is empty, and it is found to be null
   3 thread-2 is put into pending future (not batch Receive)
   
   @MarvinCai 
   how can we deal with this case?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 edited a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
315157973 edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-820548537


   This is indeed a problem
   ```
           //1
          Message<T> message = incomingMessages.poll();
           if (message == null) {
            //2
               pendingReceives.add(result);
               cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
           } else {
            //3
               messageProcessed(message);
               result.complete(beforeConsume(message));
           }
   ```
   
   ```
           // 4
           if (peekPendingReceive() != null) {
               notifyPendingReceivedCallback(message, null);
           }
   ```
   
   When the execution order is 1、4、2、1、3、4, the order of 3 and 4 cannot be guaranteed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826457026






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn edited a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-823066448


   PING @eolivelli  @315157973 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn edited a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826025164


   @MarvinCai 
   receiveAsync will be called until completableFuture have been completed.
   so there is no `next time receiveAsync() is called`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui edited a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
codelipenghui edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826457683


   Looks good to me, @linlinnn I think you can use this PR to revert #9261 first. @hangc0276 Please also help take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai commented on pull request #10240: [consumer] Revert "Remove consumer unnecessary locks (#9261)"

Posted by GitBox <gi...@apache.org>.
MarvinCai commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826667829


   LGTM, probably it is the hidden cause for many other flaky tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn edited a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-820293992


   > @linlinnn I have some doubts here, why we need to guarantee message order across partitions
   
   Yes, we don't need to guarantee message order across partitions, but we should guarantee message order from the same partition, The `MultiTopicsConsumerImpl` iterates `ConsumerImpl`(consuming messages from single partition) to consume messages from multiple topics and multiple partitions. In this issue, The `MultiTopicsConsumerImpl` broken the order of the message from the same partition
   `
   newConsumers.forEach(consumer -> {
                   consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(), conf.getReceiverQueueSize());
                   receiveMessageFromConsumer(consumer);
               });
   `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai edited a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
MarvinCai edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-824489484


   @linlinnn 
   Sorry for the delay.
   So to clarify the problem, 
   It happens when:
   1 . MultiConsumer try to receive from underlying consumer when receiver queue is empty
   2 . It enter pending receive queue
   3 . Message received by io thread, it check pending queue find out it not empty, poll first pending message and let external-listener thread to complete the future.
   4 . Meanwhile if message comes again and pending receive queue is empty, message went into receiver queue
   5 . If MultiConsumer receive again it'll get message from receiver queue without waiting and this is done by client-internal thread
   3 ) and 5 ) is executed on different thread hence the final order of messages seen by MultiConsumer can be non-deterministic.
   It only happens if we're not batching and receiver queue will be empty most of times.
   First my previous thought to use thenAcceptAsync() with a pinned executor **doesn't work as expected**, the order still depends on the scheduling which is non-deterministic.
   
   But looking at the problem, I think it's mostly causing by 2 facts:
   1 . Message can be either retrieve from receiver queue(no pending receive) or directly from incoming broker message request(has pending receive)
   2 . Above 2 scenarios are executed by different thread
   
   So we can just solve them by:
   1 . All message will enqueue first as you're doing now.
   2 . Only one thread try to fulfill all receive request with a fixed order.
   
   For 2), we can do that by 
   1 . Always fetch message from the receiver queue
   2 . Put message to pending receive queue when a) receiver queue is empty b) pending receive queue is non-empty c) pending receive queue is empty but last pending request has not yet completed.
   3 . Else a receive request will just poll message from the receiver queue.
   4 . All receive requests will be completed by only one thread, client-internal thread
   
   **In this case the incomingMessages will be the sole queue to store messages,
   Io thread will be the only producer and client-internal thread will be the only consumer, while the order of consumed message is guaranteed by the fact that a) either requests come from a queue which is ordered b) queue is empty and request is fulfilled in order they arrive which is also ordered. And we can implement this without any implicit locking.** 
   
   The `pendingReceives` doesn't use lock and in high throughput case most message goes into the `incomingMessages` anyway, so I think this shouldn't have too much negative impact on performance. But we should definitely test that.
   I tried implement it: https://github.com/MarvinCai/pulsar/commit/1c269515179dce3f520011ac5d59f28e1a17ff47
   Tested with 50000 messages in NullValueTest and seems working fine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on a change in pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on a change in pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#discussion_r615528424



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1050,15 +1043,15 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ac
 
             // Enqueue the message so that it can be retrieved when application calls receive()
             // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
-            // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
             if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
                     redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
                 possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
                         Collections.singletonList(message));
             }
+            enqueueMessage(message);
             if (peekPendingReceive() != null) {
-                notifyPendingReceivedCallback(message, null);
-            } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
+                notifyPendingReceivedCallback(incomingMessages.poll(), null);

Review comment:
       > Your current logic still has thread safety issues. If only one thread is consuming, it may cause a message in the queue to never be consumed, and there is also a pending future in the pendingQueue. This situation will occur in the scene of the last message.
   
   Thanks for your review, could you show this case as time line?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-823066448


   PING @eolivelli 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai edited a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
MarvinCai edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826024566


   > > It may cause a message in the queue to never be consumed, and there is also a pending future in the pendingQueue. This situation will occur in the scene of the last message.
   > > notify thread is thread-1
   > > consumer receive thread is thread-2
   > > 1 thread-2 poll out of the message is null
   > > 2 thread-1 The message enters the queue and judges whether the pending is empty, and it is found to be null
   > > 3 thread-2 is put into pending future (not batch Receive)
   > 
   > @MarvinCai
   > how can we deal with this case?
   
   I don't see a liveness problem in this case, even after step 3, the receive request A is put into pending receive queue, next time receiveAsync() is called, a received request B is put into pending receive queue again, if there's message in incomingQueue, receive request A will be polled from pendingReceiveQueue first and receive first message in incomingMessages queue.
   I do see in this scenario the size of the pending receive queue could grow as we might not give enough chance to notifyPendingRequest, but we could try notify pendingReceiveRequest after we put a receive request into the queue if incomingQueue is not empty.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai edited a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
MarvinCai edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826024566


   > > It may cause a message in the queue to never be consumed, and there is also a pending future in the pendingQueue. This situation will occur in the scene of the last message.
   > > notify thread is thread-1
   > > consumer receive thread is thread-2
   > > 1 thread-2 poll out of the message is null
   > > 2 thread-1 The message enters the queue and judges whether the pending is empty, and it is found to be null
   > > 3 thread-2 is put into pending future (not batch Receive)
   > 
   > @MarvinCai
   > how can we deal with this case?
   
   I don't see a liveness problem in this case, even after step 3, the receive request A is put into pending receive queue, next time receiveAsync() is called, a received request B is put into pending receive queue again, if there's message in incomingQueue, receive request A will be polled from pendingReceiveQueue first and processed.
   I do see in this scenario the size of the pending receive queue could grow as we might not give enough chance to notifyPendingRequest, but we could try notify pendingReceiveRequest after we put a receive request into the queue if incomingQueue is not empty.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-822136496


   @eolivelli @lhotari PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
MarvinCai commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826026957


   > completableFuture
   
   The future will be complete once new message comes in, in ConsumerImpl.messageReceived it'll try to notify pending receives after enqueue message.
   And we can also try to notify pending receive after we put a request into pending request queue if incomingMessage is not empty.
   If no more new message comes in then it should just stay in pending receive queue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826112959






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-820567834


   Emm...We have orderExecutor  CC @hangc0276 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on a change in pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on a change in pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#discussion_r615272057



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1050,15 +1043,15 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ac
 
             // Enqueue the message so that it can be retrieved when application calls receive()
             // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
-            // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
             if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
                     redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
                 possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
                         Collections.singletonList(message));
             }
+            enqueueMessage(message);

Review comment:
       always enqueue first, then the follow `incomingMessages.poll()` present the head of message(never return null), in this way, to ensure consuming message with the enqueue order.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#discussion_r615270295



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
##########
@@ -267,4 +261,10 @@ public int choosePartition(Message<?> msg, TopicMetadata metadata) {
 
     }
 
+    private MessageRouter messageRouter = new MessageRouter() {

Review comment:
       nit:
   static ?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1050,15 +1043,15 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ac
 
             // Enqueue the message so that it can be retrieved when application calls receive()
             // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
-            // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
             if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
                     redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
                 possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
                         Collections.singletonList(message));
             }
+            enqueueMessage(message);
             if (peekPendingReceive() != null) {
-                notifyPendingReceivedCallback(message, null);
-            } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
+                notifyPendingReceivedCallback(incomingMessages.poll(), null);

Review comment:
       `incomingMessages.poll() ` may return null

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1050,15 +1043,15 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ac
 
             // Enqueue the message so that it can be retrieved when application calls receive()
             // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
-            // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
             if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
                     redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
                 possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
                         Collections.singletonList(message));
             }
+            enqueueMessage(message);

Review comment:
       why are you moving this before `if (peekPendingReceive() != null) {`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on a change in pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on a change in pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#discussion_r613970165



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -407,21 +407,19 @@ public UnAckedMessageTracker getUnAckedMessageTracker() {
     protected CompletableFuture<Message<T>> internalReceiveAsync() {
         CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
         CompletableFuture<Message<T>> result = cancellationHandler.createFuture();
-        Message<T> message = null;
+
         try {
-            message = incomingMessages.poll(0, TimeUnit.MILLISECONDS);
+            reentrantLock.lock();

Review comment:
       @eolivelli Thanks for your suggestion, could you explain a little bit?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai commented on a change in pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
MarvinCai commented on a change in pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#discussion_r616774270



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1050,15 +1053,23 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ac
 
             // Enqueue the message so that it can be retrieved when application calls receive()
             // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
-            // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
             if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
                     redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
                 possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
                         Collections.singletonList(message));
             }
-            if (peekPendingReceive() != null) {
-                notifyPendingReceivedCallback(message, null);
-            } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
+            CompletableFuture<Message<T>> pendingReceiveFuture;
+            lock.writeLock().lock();

Review comment:
       I really don't think we should lock here, this method will be invoked by netty-io thread, we shouldn't ever block this thread.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn edited a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-820293992


   > @linlinnn I have some doubts here, why we need to guarantee message order across partitions
   
   Yes, we don't need to guarantee message order across partitions, but we should guarantee message order from the same partition, The `MultiTopicsConsumerImpl` iterates `ConsumerImpl`(consuming messages from single partition) to consume messages from multiple topics and multiple partitions. In this issue, The `MultiTopicsConsumerImpl` broken the order of the message from the same partition
   ```
   newConsumers.forEach(consumer -> {
                   // each ConsumerImpl consume messages from single partition
                   consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(), conf.getReceiverQueueSize());
                   receiveMessageFromConsumer(consumer);
               });
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826457026


   > IMO,it’s better to revert first, so that we can easily distinguish whether the flaky unit test is caused by my PR
   
   +1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on a change in pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on a change in pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#discussion_r617212743



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1050,15 +1053,23 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ac
 
             // Enqueue the message so that it can be retrieved when application calls receive()
             // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
-            // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
             if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
                     redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
                 possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
                         Collections.singletonList(message));
             }
-            if (peekPendingReceive() != null) {
-                notifyPendingReceivedCallback(message, null);
-            } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
+            CompletableFuture<Message<T>> pendingReceiveFuture;
+            lock.writeLock().lock();
+            try {
+                enqueueMessage(message);

Review comment:
       @315157973 
   I think we should lock enqueue/notify, dequeue/pending together, otherwise still cause the issue that you mentioned above.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
MarvinCai commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826024566


   > > It may cause a message in the queue to never be consumed, and there is also a pending future in the pendingQueue. This situation will occur in the scene of the last message.
   > > notify thread is thread-1
   > > consumer receive thread is thread-2
   > > 1 thread-2 poll out of the message is null
   > > 2 thread-1 The message enters the queue and judges whether the pending is empty, and it is found to be null
   > > 3 thread-2 is put into pending future (not batch Receive)
   > 
   > @MarvinCai
   > how can we deal with this case?
   
   I don't see a liveness problem in this case, even after step 3, the receive request A is put into pending receive queue, next time receiveAsync() is called, a received request B is put into pending receive queue again, if there's message in incomingQueue, receive request A will be polled and processed.
   I do see in this scenario the size of the pending receive queue could grow as we might not give enough chance to notifyPendingRequest, but we could try notify pendingReceiveRequest after we put a receive request into the queue if incomingQueue is not empty.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-821795489


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
MarvinCai commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-820425210


   I think we should guarantee order by process incoming message from single thread, instead of adding lock which can impact performance due to context switch and wait.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826023385


   > notify thread is thread-1
   consumer receive thread is thread-2
   1 thread-2 poll out of the message is null
   2 thread-1 The message enters the queue and judges whether the pending is empty, and it is found to be null
   3 thread-2 is put into pending future (not batch Receive)
   
   @MarvinCai 
   how can we deal with this case?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826027339


   > And we can also try to notify pending receive after we put a request into pending request queue if incomingMessage is not empty.
   
   How do you want to deal with this point? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on a change in pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on a change in pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#discussion_r615537359



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1050,15 +1043,15 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ac
 
             // Enqueue the message so that it can be retrieved when application calls receive()
             // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
-            // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
             if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
                     redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
                 possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
                         Collections.singletonList(message));
             }
+            enqueueMessage(message);
             if (peekPendingReceive() != null) {
-                notifyPendingReceivedCallback(message, null);
-            } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
+                notifyPendingReceivedCallback(incomingMessages.poll(), null);

Review comment:
       Thanks, I see.
   What about adding heartbeat message to avoid thread being hang up? 
   I prefer to find a lock-free solution.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on a change in pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on a change in pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#discussion_r615273575



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1050,15 +1043,15 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ac
 
             // Enqueue the message so that it can be retrieved when application calls receive()
             // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
-            // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
             if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
                     redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
                 possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
                         Collections.singletonList(message));
             }
+            enqueueMessage(message);
             if (peekPendingReceive() != null) {
-                notifyPendingReceivedCallback(message, null);
-            } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
+                notifyPendingReceivedCallback(incomingMessages.poll(), null);

Review comment:
       see above




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-821791513


   > @linlinnn please rebase on latest apache/pulsar master since the problem in the master branch was fixed separately by #10250 .
   
   @lhotari Thanks. I mean could could you please help review this PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-821776677


   CICD passed. Could you please help review if you are available? @lhotari 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai edited a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
MarvinCai edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-820765379


   @merlimat @sijie @codelipenghui any thought on this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-820965281


   @eolivelli @codelipenghui @lhotari Please help review this PR, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826025164


   @MarvinCai 
   Wait for minutes, I try a test.
   IMO, receiveAsync will be called until completeFuture have been completed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-820444419


   > I think we should guarantee order by process incoming message from single thread, instead of adding lock which can impact performance due to context switch and wait.
   
   Emm. I agree your consideration. But the concurrent logic `notify` and `offer` seems conflict with single thread without lock.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-821784186


   @linlinnn please rebase on latest apache/pulsar master since the problem in the master branch was fixed separately by #10250 .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10240: [consumer] Revert "Remove consumer unnecessary locks (#9261)"

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#discussion_r619972470



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -414,6 +414,7 @@ public UnAckedMessageTracker getUnAckedMessageTracker() {
         CompletableFuture<Message<T>> result = cancellationHandler.createFuture();
         Message<T> message = null;
         try {
+            lock.writeLock().lock();

Review comment:
       Please move out of the try block




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn edited a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-824495546


   > Place holder, will post some of my though tonight.
   
   @MarvinCai 
   Look forward to it : )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-820548537


   This is indeed a problem
   ```
           //1
          Message<T> message = incomingMessages.poll();
           if (message == null) {
            //2
               pendingReceives.add(result);
               cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
           } else {
            //3
               messageProcessed(message);
               result.complete(beforeConsume(message));
           }
   ```
   
   ```
           // 4
           if (peekPendingReceive() != null) {
               notifyPendingReceivedCallback(message, null);
           }
   ```
   
   When the execution order is 1、2、1、3、4, the order of 3 and 4 cannot be guaranteed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
MarvinCai commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-820765379


   @sijie @codelipenghui any thought on this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-822012430


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on a change in pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on a change in pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#discussion_r618965736



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1050,15 +1053,23 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ac
 
             // Enqueue the message so that it can be retrieved when application calls receive()
             // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
-            // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
             if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
                     redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
                 possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
                         Collections.singletonList(message));
             }
-            if (peekPendingReceive() != null) {
-                notifyPendingReceivedCallback(message, null);
-            } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
+            CompletableFuture<Message<T>> pendingReceiveFuture;
+            lock.writeLock().lock();
+            try {
+                enqueueMessage(message);

Review comment:
       @315157973
   WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on a change in pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on a change in pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#discussion_r615272057



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1050,15 +1043,15 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ac
 
             // Enqueue the message so that it can be retrieved when application calls receive()
             // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
-            // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
             if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
                     redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
                 possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
                         Collections.singletonList(message));
             }
+            enqueueMessage(message);

Review comment:
       always enqueue first, then the follow `incomingMessages.poll()` present the head of messages(never return null), in this way, to ensure consuming message with the enqueue order.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826176013


   I think we'd better revert #9261 first to avoid potential thread-safe problems.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn edited a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-821791513


   > @linlinnn please rebase on latest apache/pulsar master since the problem in the master branch was fixed separately by #10250 .
   
   @lhotari Thanks. I mean could you please help review this PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-824495546


   > Place holder, will post some of my though tonight.
   
   Look forward to it : )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826112959


   I found a flaky test: `MultiTopicsReaderTest#testMultiTopic`, because the last message could not be consumed.
   
   If there is no better solution, I suggest fixing this problem as soon as possible
   
   @codelipenghui @eolivelli 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on a change in pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on a change in pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#discussion_r615537359



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1050,15 +1043,15 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ac
 
             // Enqueue the message so that it can be retrieved when application calls receive()
             // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
-            // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
             if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
                     redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
                 possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
                         Collections.singletonList(message));
             }
+            enqueueMessage(message);
             if (peekPendingReceive() != null) {
-                notifyPendingReceivedCallback(message, null);
-            } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
+                notifyPendingReceivedCallback(incomingMessages.poll(), null);

Review comment:
       Thanks, I see.
   What about adding heartbeat message to avoid thread being hang up? 
   I attend to lock-free solution.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-821805795


   > > @linlinnn please rebase on latest apache/pulsar master since the problem in the master branch was fixed separately by #10250 .
   > 
   > @lhotari Thanks. I mean could you please help review this PR. 
   
   @linlinnn this PR includes "fix CICD" commits which have been addressed in master branch. With rebasing, I mean to rebase the commits in this PR over current master branch and removing the unnecessary "fix CICD" commits which are unrelated to this PR. 
   I plan to do reviews on Monday, I hope that's fine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826457683


   Looks good to me, @linlinnn I think you can use this PR to revert #9261. @hangc0276 Please also help take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-820294725


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-820786672


   @MarvinCai 
   poll method is not a block method, it will return the head of this queue, or null if this queue is empty
   `pulsar-external-listener` execute receivedFuture.complete(message)
   and then the message is accepted by `pulsar-client-internal` to do final process. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-820398831


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn edited a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-820293992






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #10240: [consumer] Revert "Remove consumer unnecessary locks (#9261)"

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-827360314


   ping @eolivelli Please help review this PR, this issue makes many tests unstable.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#discussion_r613944345



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -407,21 +407,19 @@ public UnAckedMessageTracker getUnAckedMessageTracker() {
     protected CompletableFuture<Message<T>> internalReceiveAsync() {
         CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
         CompletableFuture<Message<T>> result = cancellationHandler.createFuture();
-        Message<T> message = null;
+
         try {
-            message = incomingMessages.poll(0, TimeUnit.MILLISECONDS);
+            reentrantLock.lock();

Review comment:
       move out of the "try" block please (here and below)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui edited a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
codelipenghui edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826457683


   Looks good to me, @linlinnn I think you can use this PR to revert #9261 first. @hangc0276 Please also help take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826429499


   @linlinnn @MarvinCai @eolivelli Seems the lock-free approach introduced by @315157973 has better performance and it's more simple than introduce locks. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn edited a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-820786672


   @MarvinCai 
   poll method is not a block method, it will return the head of this queue, or null if this queue is empty
   `pulsar-external-listener` execute [receivedFuture.complete(message)](https://github.com/apache/pulsar/blob/3e5fbcea4c424302cfd258d1238d2135eca2a555/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L235)
   and then the message is accepted by `pulsar-client-internal` to do final process. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#discussion_r615677609



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1050,15 +1043,15 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ac
 
             // Enqueue the message so that it can be retrieved when application calls receive()
             // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
-            // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
             if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
                     redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
                 possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
                         Collections.singletonList(message));
             }
+            enqueueMessage(message);
             if (peekPendingReceive() != null) {
-                notifyPendingReceivedCallback(message, null);
-            } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
+                notifyPendingReceivedCallback(incomingMessages.poll(), null);

Review comment:
       In my opinion, it is not a good way. It changes the current behavior and makes the receiving of messages less timely.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn edited a comment on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn edited a comment on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-820293992


   > @linlinnn I have some doubts here, why we need to guarantee message order across partitions
   
   Yes, we don't need to guarantee message order across partitions, but we should guarantee message order from the same partition, The `MultiTopicsConsumerImpl` iterates `ConsumerImpl`(consuming messages from single partition) to consume messages from multiple topics and multiple partitions. In this issue, The `MultiTopicsConsumerImpl` broken the order of the message from the same partition
   ```
   newConsumers.forEach(consumer -> {
                   // each ConsumerImpl messages from single partition
                   consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(), conf.getReceiverQueueSize());
                   receiveMessageFromConsumer(consumer);
               });
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826176013






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-821773187


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-821753181


   Emm.. any feedback : ) @eolivelli @MarvinCai @merlimat @315157973 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-820534582


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826447354


   IMO,it’s better to revert first, so that we can easily distinguish whether the flaky unit test is caused by my PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-821800581


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on a change in pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on a change in pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#discussion_r616885253



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1050,15 +1053,23 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ac
 
             // Enqueue the message so that it can be retrieved when application calls receive()
             // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
-            // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
             if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
                     redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
                 possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
                         Collections.singletonList(message));
             }
-            if (peekPendingReceive() != null) {
-                notifyPendingReceivedCallback(message, null);
-            } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
+            CompletableFuture<Message<T>> pendingReceiveFuture;
+            lock.writeLock().lock();

Review comment:
       Any good suggestions? 
   I'm willing to try.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-820577611


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#discussion_r615530138



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1050,15 +1043,15 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ac
 
             // Enqueue the message so that it can be retrieved when application calls receive()
             // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
-            // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
             if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
                     redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
                 possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
                         Collections.singletonList(message));
             }
+            enqueueMessage(message);
             if (peekPendingReceive() != null) {
-                notifyPendingReceivedCallback(message, null);
-            } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
+                notifyPendingReceivedCallback(incomingMessages.poll(), null);

Review comment:
       notify thread is thread-1
   Consumer Receive thread is thread-2
   
   1 thread-2 poll out of the message is null
   2 thread-1 The message enters the queue and judges whether the pending is empty, and it is found to be null
   3 thread-2 is put into pending future (not batch Receive)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #10240: [consumer] Revert "Remove consumer unnecessary locks (#9261)"

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-827361785


   @linlinnn Could you please help resolve the conflict?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #10240: [consumer] fix order guarantee for MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #10240:
URL: https://github.com/apache/pulsar/pull/10240#issuecomment-826308328


   The performance test using Executor has been added


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org