You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/01/21 12:19:36 UTC

[GitHub] [pulsar] hangc0276 opened a new pull request #9261: remove consumer unnecessary locks

hangc0276 opened a new pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261


   ### Motivation
   1. The `ConsumerImpl` has many unnecessary locks for thread-safe Queue, such as `Queues.newConcurrentLinkedQueue`, `GrowableArrayBlockingQueue`, `ConcurrentLinkedQueue` 
   2. For `BatchReceive` APIs, there are locks to ensure exactly less than batchReceive policy.
   
   ### Changes
   1. Remove unnecessary locks in `ConsumerImpl`
   2. Remove the locks in `BatchReceive` APIs, with the cost of put one message more than batchReceive policy, to improve consume throughput
   
   Related to PR#8207


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#issuecomment-810707508


   @codelipenghui @sijie @merlimat PTAL again, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui edited a comment on pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
codelipenghui edited a comment on pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#issuecomment-764670117


   @hangc0276 
   
   > Remove the locks in BatchReceive APIs, with the cost of put one message more than batchReceive policy, to improve consume throughput
   
   This will break the current behavior. In some cases, user want a guarantee of the batch size.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on a change in pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on a change in pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#discussion_r561930966



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -774,16 +767,17 @@ protected void notifyPendingBatchReceivedCallBack() {
 
     protected final void notifyPendingBatchReceivedCallBack(OpBatchReceive<T> opBatchReceive) {
         MessagesImpl<T> messages = getNewMessagesImpl();
-        Message<T> msgPeeked = incomingMessages.peek();
-        while (msgPeeked != null && messages.canAdd(msgPeeked)) {
-            Message<T> msg = incomingMessages.poll();
+
+        Message<T> msg = null;
+        do {
+            msg = incomingMessages.poll();
             if (msg != null) {
                 messageProcessed(msg);
                 Message<T> interceptMsg = beforeConsume(msg);
                 messages.add(interceptMsg);
             }
-            msgPeeked = incomingMessages.peek();
-        }
+        } while (msg != null && messages.canAdd());

Review comment:
       Yes, it will exceed one message.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#issuecomment-767233751






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#issuecomment-811128278


   > @hangc0276 I think the description should be updated since the change does not break the batch receive behavior?
   
   @codelipenghui  Ok, i have updated the description.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#issuecomment-765176223


   I pressure test with pulser-perf reader
   ```
   1 process, 1 reader to consume 1 partition from earliest
   bin/pulsar-perf read -t 1 -q 10000 -u pulsar://xxx:6650 -i 10 -m earliest persistent://test/test/xxx-partition-0
   
   1 process, 3 reader to consume 3 partition from earliest
   bin/pulsar-perf read -t 3 -q 10000 -u pulsar://xxx:6650 -i 10 -m earliest persistent://test/test/xxx-partition
   ```
   
   The result as follow
   
   | version | reader | msgOut (msg/s) | bytesOut (Mbit/s) |
   | --- | --- | --- | --- |
   | pulsar-master | 3 reader | 36494.152 | 719.944 |
   | pulsar-master | 1 reader | 13201.557  | 260.447  | 
   | pulsar-master-patch | 3 reader  | 52088.967 | 1027.701 |
   | pulsar-master-patch | 1 reader  | 19219.543 | 379.273 |
   
   It has improved 45%.
   @315157973 @codelipenghui 
   
   For BatchReceive, I will test latter.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#issuecomment-764669987


   > I am interested in how much performance has improved
   
   @315157973 I will paste the perf result latter.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#issuecomment-764670117


   @hangc0276 
   
   > Remove the locks in BatchReceive APIs, with the cost of put one message more than batchReceive policy, to improve consume throughput
   
   This will break the current behavior. In some cases, user want a guarantee of the 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#issuecomment-764624698


   I am interested in how much performance has improved


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#issuecomment-767342438


   /pulsarbot run-failure-checks
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#issuecomment-772158141


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#issuecomment-766496055


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui closed pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
codelipenghui closed pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on a change in pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on a change in pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#discussion_r561930966



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -774,16 +767,17 @@ protected void notifyPendingBatchReceivedCallBack() {
 
     protected final void notifyPendingBatchReceivedCallBack(OpBatchReceive<T> opBatchReceive) {
         MessagesImpl<T> messages = getNewMessagesImpl();
-        Message<T> msgPeeked = incomingMessages.peek();
-        while (msgPeeked != null && messages.canAdd(msgPeeked)) {
-            Message<T> msg = incomingMessages.poll();
+
+        Message<T> msg = null;
+        do {
+            msg = incomingMessages.poll();
             if (msg != null) {
                 messageProcessed(msg);
                 Message<T> interceptMsg = beforeConsume(msg);
                 messages.add(interceptMsg);
             }
-            msgPeeked = incomingMessages.peek();
-        }
+        } while (msg != null && messages.canAdd());

Review comment:
       Yes, it will exceed one message.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#issuecomment-766496055






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#issuecomment-811123239


   @hangc0276 I think the description should be updated since the change does not break the batch receive behavior?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#issuecomment-767938106


   /pulsarbot run-failure-checks
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui edited a comment on pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
codelipenghui edited a comment on pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#issuecomment-764670117


   @hangc0276 
   
   > Remove the locks in BatchReceive APIs, with the cost of put one message more than batchReceive policy, to improve consume throughput
   
   This will break the current behavior. In some cases, user want a guarantee of the batch size.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#issuecomment-771335884


   @codelipenghui Can you review this again? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 closed pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
hangc0276 closed pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#issuecomment-764624698


   I am interested in how much performance has improved


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#issuecomment-766307076


   For BatchReceive, I add batchReceive policy for PerformanceConsumer #9295 , and test as follow
   1. create topic with 30 partitions (persistent://test/test/test_batch_v1)
   1. run perf producer at high performance
   2. run consumer with batch receive policy and start from latest to ensure read message from cache, in order to avoid read bookie entry disk
   
   perf command as follow:
   1. run perf producer
   ```
   bin/pulsar-perf produce -u pulsar://xxx:6650 -i 10 -r 500000 -threads 5 -ioThreads 3 persistent://test/test/test_batch_v1
   ```
   2. run consumer using pulsar-master lib and pulsar-master-patch lib
   ```
   bin/pulsar-perf consume -u pulsar://xxx:6650 -i 10 -sp Latest -time 240 -s batch-perf-v1 -q 10000 -bb 1048576 -bm 100 -bt 100 persistent://test/test/test_batch_v1
   ```
   
   The result as follow
   
   | version | msgOut (msg/s) | bytesOut (Mbit/s) |
   | --- | --- | --- |
   | pulsar-master | 110081.956 | 860.015 |
   | pulsar-master-patch | 116298.462 | 908.582 |
   
   The consume performance improved 5.6%
   The `pulsar-master-patch` version just remove the lock for batchReceive module.
   
   I will move the lock back for batchReceive module.
   @codelipenghui @315157973 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 closed pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
hangc0276 closed pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui closed pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
codelipenghui closed pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#issuecomment-772118269


   /pulsarbot run-failure-checks
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#issuecomment-772118269


   /pulsarbot run-failure-checks
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#discussion_r561854532



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -774,16 +767,17 @@ protected void notifyPendingBatchReceivedCallBack() {
 
     protected final void notifyPendingBatchReceivedCallBack(OpBatchReceive<T> opBatchReceive) {
         MessagesImpl<T> messages = getNewMessagesImpl();
-        Message<T> msgPeeked = incomingMessages.peek();
-        while (msgPeeked != null && messages.canAdd(msgPeeked)) {
-            Message<T> msg = incomingMessages.poll();
+
+        Message<T> msg = null;
+        do {
+            msg = incomingMessages.poll();
             if (msg != null) {
                 messageProcessed(msg);
                 Message<T> interceptMsg = beforeConsume(msg);
                 messages.add(interceptMsg);
             }
-            msgPeeked = incomingMessages.peek();
-        }
+        } while (msg != null && messages.canAdd());

Review comment:
       This will make the message size in the collection exceed the limit




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#discussion_r561918370



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -774,16 +767,17 @@ protected void notifyPendingBatchReceivedCallBack() {
 
     protected final void notifyPendingBatchReceivedCallBack(OpBatchReceive<T> opBatchReceive) {
         MessagesImpl<T> messages = getNewMessagesImpl();
-        Message<T> msgPeeked = incomingMessages.peek();
-        while (msgPeeked != null && messages.canAdd(msgPeeked)) {
-            Message<T> msg = incomingMessages.poll();
+
+        Message<T> msg = null;
+        do {
+            msg = incomingMessages.poll();
             if (msg != null) {
                 messageProcessed(msg);
                 Message<T> interceptMsg = beforeConsume(msg);
                 messages.add(interceptMsg);
             }
-            msgPeeked = incomingMessages.peek();
-        }
+        } while (msg != null && messages.canAdd());

Review comment:
       The price of removing the lock is that the size limit is not strictly less than that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#issuecomment-764670117


   @hangc0276 
   
   > Remove the locks in BatchReceive APIs, with the cost of put one message more than batchReceive policy, to improve consume throughput
   
   This will break the current behavior. In some cases, user want a guarantee of the 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#issuecomment-770669928


   I checked where the lock is removed, it should be thread safe
   
   cc @eolivelli 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#issuecomment-764669987


   > I am interested in how much performance has improved
   
   @315157973 I will paste the perf result latter.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#issuecomment-771335884


   @codelipenghui Can you review this again? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#discussion_r561854532



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -774,16 +767,17 @@ protected void notifyPendingBatchReceivedCallBack() {
 
     protected final void notifyPendingBatchReceivedCallBack(OpBatchReceive<T> opBatchReceive) {
         MessagesImpl<T> messages = getNewMessagesImpl();
-        Message<T> msgPeeked = incomingMessages.peek();
-        while (msgPeeked != null && messages.canAdd(msgPeeked)) {
-            Message<T> msg = incomingMessages.poll();
+
+        Message<T> msg = null;
+        do {
+            msg = incomingMessages.poll();
             if (msg != null) {
                 messageProcessed(msg);
                 Message<T> interceptMsg = beforeConsume(msg);
                 messages.add(interceptMsg);
             }
-            msgPeeked = incomingMessages.peek();
-        }
+        } while (msg != null && messages.canAdd());

Review comment:
       This will make the message size in the collection exceed the limit

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -774,16 +767,17 @@ protected void notifyPendingBatchReceivedCallBack() {
 
     protected final void notifyPendingBatchReceivedCallBack(OpBatchReceive<T> opBatchReceive) {
         MessagesImpl<T> messages = getNewMessagesImpl();
-        Message<T> msgPeeked = incomingMessages.peek();
-        while (msgPeeked != null && messages.canAdd(msgPeeked)) {
-            Message<T> msg = incomingMessages.poll();
+
+        Message<T> msg = null;
+        do {
+            msg = incomingMessages.poll();
             if (msg != null) {
                 messageProcessed(msg);
                 Message<T> interceptMsg = beforeConsume(msg);
                 messages.add(interceptMsg);
             }
-            msgPeeked = incomingMessages.peek();
-        }
+        } while (msg != null && messages.canAdd());

Review comment:
       The price of removing the lock is that the size limit is not strictly less than that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #9261: remove consumer unnecessary locks

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #9261:
URL: https://github.com/apache/pulsar/pull/9261#issuecomment-766599202






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org