You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/04/24 16:40:51 UTC

[GitHub] [pulsar] 315157973 opened a new pull request #10352: Make Consumer thread safe and lock-free

315157973 opened a new pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352


   
   
   ### Motivation
   Lock-free solution for https://github.com/apache/pulsar/pull/10240
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352#issuecomment-826308185


   The performance test results are in two folders 2.6.1 and 2.8.0
   https://github.com/315157973/openmessaging-benchmark/tree/executor/bin
   
   5 IO Thread
   It seems that the performance has been improved by 50%, which is similar to the previous test results
   
   client 2.6.1 :
   ![image](https://user-images.githubusercontent.com/9758905/115991390-b27ee600-a5fa-11eb-89ce-e203c6065e54.png)
   
   client:2.8.0-snapshot (use executor) 
   ![image](https://user-images.githubusercontent.com/9758905/115991407-d6dac280-a5fa-11eb-9c08-d3052dff4920.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352#discussion_r620871200



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -957,10 +951,12 @@ private void closeConsumerTasks() {
     }
 
     private void failPendingReceive() {
-        if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {
-            failPendingReceives(this.pendingReceives);
-            failPendingBatchReceives(this.pendingBatchReceives);
-        }
+        internalPinnedExecutor.execute(() -> {
+            if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {

Review comment:
       Testing for IaShutdown is probably not enough, because in the meantime someone may shutdown it.
   Probably we can add try/catch RejectedExecutionException 

##########
File path: pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
##########
@@ -48,6 +47,7 @@
         when(clientMock.timer()).thenReturn(mock(Timer.class));
 
         when(clientMock.externalExecutorProvider()).thenReturn(mock(ExecutorProvider.class));
+        when(clientMock.getInternalExecutorService()).thenReturn(Executors.newSingleThreadExecutor());
         when(clientMock.eventLoopGroup().next()).thenReturn(mock(EventLoop.class));

Review comment:
       Please shutdown this executor




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352#issuecomment-831842712


   thank you @linlinnn for your comment.
   
   let's wait for CI to complete


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 edited a comment on pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
315157973 edited a comment on pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352#issuecomment-826308185






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352#discussion_r625533508



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1292,17 +1280,15 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv
                 if (possibleToDeadLetter != null) {
                     possibleToDeadLetter.add(message);
                 }
-                lock.readLock().lock();
-                try {
+                internalPinnedExecutor.execute(() -> {
                     if (peekPendingReceive() != null) {
                         notifyPendingReceivedCallback(message, null);
                     } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
                         notifyPendingBatchReceivedCallBack();
                     }
-                } finally {
-                    lock.readLock().unlock();
-                }
-                singleMessagePayload.release();
+                    singleMessagePayload.release();
+                    tryTriggerListener();

Review comment:
       The behavior here is consistent, the previous logic is
   ```
   if(){
     // do some thing
   }else{
     // do some thing
   }
   triggerListener
   ```
   I just put the triggerListener into the if and else respectively.
   
   If I don’t do this, triggerListener will execute in parallel with the if-else logic
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on a change in pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
linlinnn commented on a change in pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352#discussion_r625564953



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1292,17 +1280,15 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv
                 if (possibleToDeadLetter != null) {
                     possibleToDeadLetter.add(message);
                 }
-                lock.readLock().lock();
-                try {
+                internalPinnedExecutor.execute(() -> {
                     if (peekPendingReceive() != null) {
                         notifyPendingReceivedCallback(message, null);
                     } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
                         notifyPendingBatchReceivedCallBack();
                     }
-                } finally {
-                    lock.readLock().unlock();
-                }
-                singleMessagePayload.release();
+                    singleMessagePayload.release();
+                    tryTriggerListener();

Review comment:
       You put `triggerListener` into the [cycle](https://github.com/apache/pulsar/blob/c1517ffbe21f767a1147633f71e0bc0153b5a7db/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1240)
   Maybe have a bad influence when the batchSize is big
   we enqueue all message and trigger once before, now we enqueue one message and trigger one time.
   
   What about just 
   
   ```
   if(){
     // do some thing
   }else{
     // do some thing
   }
   internalPinnedExecutor.execute(() -> {
         triggerListener();
   });
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352#issuecomment-826636720


   Wow, all unit tests are done at once


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352#issuecomment-827306452


   In the case of adding lock, there are still the following flaky tests
   https://github.com/apache/pulsar/pull/10240/checks?check_run_id=2435235663#step:9:783
   ![image](https://user-images.githubusercontent.com/9758905/116182799-70b38400-a74f-11eb-8cf4-03701d1431ab.png)
   
   In the case of adding thread pool
   https://github.com/apache/pulsar/pull/10352/checks?check_run_id=2440197210#step:9:782
   ![image](https://user-images.githubusercontent.com/9758905/116182901-9b054180-a74f-11eb-9a68-e1029e20a33a.png)
   
   Therefore, the performance of adding a thread pool and adding a lock is the same. I haven't seen other flaky errors when using the thread pool.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on a change in pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
linlinnn commented on a change in pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352#discussion_r625564953



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1292,17 +1280,15 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv
                 if (possibleToDeadLetter != null) {
                     possibleToDeadLetter.add(message);
                 }
-                lock.readLock().lock();
-                try {
+                internalPinnedExecutor.execute(() -> {
                     if (peekPendingReceive() != null) {
                         notifyPendingReceivedCallback(message, null);
                     } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
                         notifyPendingBatchReceivedCallBack();
                     }
-                } finally {
-                    lock.readLock().unlock();
-                }
-                singleMessagePayload.release();
+                    singleMessagePayload.release();
+                    tryTriggerListener();

Review comment:
       You put `triggerListener` into the [cycle](https://github.com/apache/pulsar/blob/c1517ffbe21f767a1147633f71e0bc0153b5a7db/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1240)
   Maybe have a bad influence when the batchSize is big
   we enqueue all message and trigger once before, now we enqueue a message and trigger one time.
   
   What about just 
   
   ```
   if(){
     // do some thing
   }else{
     // do some thing
   }
   internalPinnedExecutor.execute(() -> {
         triggerListener();
   });
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352#issuecomment-828451004


   @merlimat @sijie @rdhabalia @codelipenghui  @eolivelli PTAL, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai commented on pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
MarvinCai commented on pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352#issuecomment-830034304


   nice


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352#discussion_r620917469



##########
File path: pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
##########
@@ -48,6 +47,7 @@
         when(clientMock.timer()).thenReturn(mock(Timer.class));
 
         when(clientMock.externalExecutorProvider()).thenReturn(mock(ExecutorProvider.class));
+        when(clientMock.getInternalExecutorService()).thenReturn(Executors.newSingleThreadExecutor());
         when(clientMock.eventLoopGroup().next()).thenReturn(mock(EventLoop.class));

Review comment:
       Oh, nice catch ,thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on a change in pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
linlinnn commented on a change in pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352#discussion_r625489507



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1292,17 +1280,15 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv
                 if (possibleToDeadLetter != null) {
                     possibleToDeadLetter.add(message);
                 }
-                lock.readLock().lock();
-                try {
+                internalPinnedExecutor.execute(() -> {
                     if (peekPendingReceive() != null) {
                         notifyPendingReceivedCallback(message, null);
                     } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
                         notifyPendingBatchReceivedCallBack();
                     }
-                } finally {
-                    lock.readLock().unlock();
-                }
-                singleMessagePayload.release();
+                    singleMessagePayload.release();
+                    tryTriggerListener();

Review comment:
       Overall is good, just left a comment.
   I think we don't need move `tryTriggerListener` into cycle or this execute block, it's better to keep as it used to be, because we cannot use `Listener` while calling `receive `or `receiveAsync`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on a change in pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
linlinnn commented on a change in pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352#discussion_r625489507



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1292,17 +1280,15 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv
                 if (possibleToDeadLetter != null) {
                     possibleToDeadLetter.add(message);
                 }
-                lock.readLock().lock();
-                try {
+                internalPinnedExecutor.execute(() -> {
                     if (peekPendingReceive() != null) {
                         notifyPendingReceivedCallback(message, null);
                     } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
                         notifyPendingBatchReceivedCallBack();
                     }
-                } finally {
-                    lock.readLock().unlock();
-                }
-                singleMessagePayload.release();
+                    singleMessagePayload.release();
+                    tryTriggerListener();

Review comment:
       Overall is good, just left a comment.
   I think we don't need move `tryTriggerListener` into cycle or this execute block, because we cannot use `Listener` while calling `receive `or `receiveAsync`.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1292,17 +1280,15 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv
                 if (possibleToDeadLetter != null) {
                     possibleToDeadLetter.add(message);
                 }
-                lock.readLock().lock();
-                try {
+                internalPinnedExecutor.execute(() -> {
                     if (peekPendingReceive() != null) {
                         notifyPendingReceivedCallback(message, null);
                     } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
                         notifyPendingBatchReceivedCallBack();
                     }
-                } finally {
-                    lock.readLock().unlock();
-                }
-                singleMessagePayload.release();
+                    singleMessagePayload.release();
+                    tryTriggerListener();

Review comment:
       Overall is good, just left a comment.
   I think we don't need move `tryTriggerListener` into cycle, because we cannot use `Listener` while calling `receive `or `receiveAsync`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on a change in pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
linlinnn commented on a change in pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352#discussion_r625564953



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1292,17 +1280,15 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv
                 if (possibleToDeadLetter != null) {
                     possibleToDeadLetter.add(message);
                 }
-                lock.readLock().lock();
-                try {
+                internalPinnedExecutor.execute(() -> {
                     if (peekPendingReceive() != null) {
                         notifyPendingReceivedCallback(message, null);
                     } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
                         notifyPendingBatchReceivedCallBack();
                     }
-                } finally {
-                    lock.readLock().unlock();
-                }
-                singleMessagePayload.release();
+                    singleMessagePayload.release();
+                    tryTriggerListener();

Review comment:
       You put `triggerListener` into the [cycle](https://github.com/apache/pulsar/blob/c1517ffbe21f767a1147633f71e0bc0153b5a7db/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1240)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on a change in pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
linlinnn commented on a change in pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352#discussion_r625489507



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1292,17 +1280,15 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv
                 if (possibleToDeadLetter != null) {
                     possibleToDeadLetter.add(message);
                 }
-                lock.readLock().lock();
-                try {
+                internalPinnedExecutor.execute(() -> {
                     if (peekPendingReceive() != null) {
                         notifyPendingReceivedCallback(message, null);
                     } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
                         notifyPendingBatchReceivedCallBack();
                     }
-                } finally {
-                    lock.readLock().unlock();
-                }
-                singleMessagePayload.release();
+                    singleMessagePayload.release();
+                    tryTriggerListener();

Review comment:
       Overall is good, just left a comment.
   I think we don't need move this into cycle or this execute block, it's better to keep as it used to be, because we cannot use `Listener` while calling `receive `or `receiveAsync`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 edited a comment on pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
315157973 edited a comment on pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352#issuecomment-826308185


   The performance test results are in two folders 2.6.1 and 2.8.0
   https://github.com/315157973/openmessaging-benchmark/tree/executor/bin
   Now the dependency in pom is 2.8.0, if you want to switch to 2.6.1, you only need to change the dependency to 2.6.1 version of pulsar-client-all.
   
   Test command: bin/benchmark --drivers driver-pulsar/pulsar.yaml workloads/max-rate-1-topic-20-partitions-20p-20c-1kb.yaml
   
   5 IO Thread
   It seems that the performance has been improved by 50%, which is similar to the previous test results
   
   client 2.6.1 :
   ![image](https://user-images.githubusercontent.com/9758905/115991390-b27ee600-a5fa-11eb-89ce-e203c6065e54.png)
   
   client:2.8.0-snapshot (use executor) 
   ![image](https://user-images.githubusercontent.com/9758905/115991407-d6dac280-a5fa-11eb-9c08-d3052dff4920.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352#discussion_r621087291



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -957,10 +951,12 @@ private void closeConsumerTasks() {
     }
 
     private void failPendingReceive() {
-        if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {
-            failPendingReceives(this.pendingReceives);
-            failPendingBatchReceives(this.pendingBatchReceives);
-        }
+        internalPinnedExecutor.execute(() -> {
+            if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {

Review comment:
       I think we can add a UnCatchExceptionHandler to log errors




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on a change in pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
linlinnn commented on a change in pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352#discussion_r625564953



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1292,17 +1280,15 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv
                 if (possibleToDeadLetter != null) {
                     possibleToDeadLetter.add(message);
                 }
-                lock.readLock().lock();
-                try {
+                internalPinnedExecutor.execute(() -> {
                     if (peekPendingReceive() != null) {
                         notifyPendingReceivedCallback(message, null);
                     } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
                         notifyPendingBatchReceivedCallBack();
                     }
-                } finally {
-                    lock.readLock().unlock();
-                }
-                singleMessagePayload.release();
+                    singleMessagePayload.release();
+                    tryTriggerListener();

Review comment:
       You put `triggerListener` into the [cycle](https://github.com/apache/pulsar/blob/c1517ffbe21f767a1147633f71e0bc0153b5a7db/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1240)
   What about just 
   
   if(){
     // do some thing
   }else{
     // do some thing
   }
   internalPinnedExecutor.execute(() -> {
         triggerListener();
   });
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352#discussion_r625533508



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1292,17 +1280,15 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv
                 if (possibleToDeadLetter != null) {
                     possibleToDeadLetter.add(message);
                 }
-                lock.readLock().lock();
-                try {
+                internalPinnedExecutor.execute(() -> {
                     if (peekPendingReceive() != null) {
                         notifyPendingReceivedCallback(message, null);
                     } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
                         notifyPendingBatchReceivedCallBack();
                     }
-                } finally {
-                    lock.readLock().unlock();
-                }
-                singleMessagePayload.release();
+                    singleMessagePayload.release();
+                    tryTriggerListener();

Review comment:
       The behavior here is consistent, the previous logic is
   ```
   if(){
   }else{
   }
   triggerListener
   ```
   I just put the triggerListener into the if and else respectively.
   
   If I don’t do this, triggerListener will execute in parallel with the if-else logic
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on a change in pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
linlinnn commented on a change in pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352#discussion_r625564953



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1292,17 +1280,15 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv
                 if (possibleToDeadLetter != null) {
                     possibleToDeadLetter.add(message);
                 }
-                lock.readLock().lock();
-                try {
+                internalPinnedExecutor.execute(() -> {
                     if (peekPendingReceive() != null) {
                         notifyPendingReceivedCallback(message, null);
                     } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
                         notifyPendingBatchReceivedCallBack();
                     }
-                } finally {
-                    lock.readLock().unlock();
-                }
-                singleMessagePayload.release();
+                    singleMessagePayload.release();
+                    tryTriggerListener();

Review comment:
       You put `triggerListener` into the [cycle](https://github.com/apache/pulsar/blob/c1517ffbe21f767a1147633f71e0bc0153b5a7db/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1240)
   What about just 
   
   ```
   if(){
     // do some thing
   }else{
     // do some thing
   }
   internalPinnedExecutor.execute(() -> {
         triggerListener();
   });
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 edited a comment on pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
315157973 edited a comment on pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352#issuecomment-826308185


   The performance test results are in two folders 2.6.1 and 2.8.0
   https://github.com/315157973/openmessaging-benchmark/tree/executor/bin
   Now the dependency in pom is 2.8.0, if you want to switch to 2.6.1, you only need to change the dependency to 2.6.1 version of pulsar-client-all.
   
   5 IO Thread
   It seems that the performance has been improved by 50%, which is similar to the previous test results
   
   client 2.6.1 :
   ![image](https://user-images.githubusercontent.com/9758905/115991390-b27ee600-a5fa-11eb-89ce-e203c6065e54.png)
   
   client:2.8.0-snapshot (use executor) 
   ![image](https://user-images.githubusercontent.com/9758905/115991407-d6dac280-a5fa-11eb-9c08-d3052dff4920.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352#issuecomment-826643707


   I will re-trigger CI several times to see if there will be some occasional problems


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352#issuecomment-826308185


   The performance test results are in two folders 2.6.1 and 2.8.0
   https://github.com/315157973/openmessaging-benchmark/tree/executor/bin
   
   5 IO Thread
   It seems that the performance has been improved by 50%, which is similar to the previous test results
   
   client 2.6.1 :
   ![image](https://user-images.githubusercontent.com/9758905/115991390-b27ee600-a5fa-11eb-89ce-e203c6065e54.png)
   
   client:2.8.0-snapshot (use executor) 
   ![image](https://user-images.githubusercontent.com/9758905/115991407-d6dac280-a5fa-11eb-9c08-d3052dff4920.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352#issuecomment-831719031


   waiting for @linlinnn approval before merging


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on a change in pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
linlinnn commented on a change in pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352#discussion_r625489507



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1292,17 +1280,15 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv
                 if (possibleToDeadLetter != null) {
                     possibleToDeadLetter.add(message);
                 }
-                lock.readLock().lock();
-                try {
+                internalPinnedExecutor.execute(() -> {
                     if (peekPendingReceive() != null) {
                         notifyPendingReceivedCallback(message, null);
                     } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
                         notifyPendingBatchReceivedCallBack();
                     }
-                } finally {
-                    lock.readLock().unlock();
-                }
-                singleMessagePayload.release();
+                    singleMessagePayload.release();
+                    tryTriggerListener();

Review comment:
       I think we don't need move this into cycle or this execute block, it's better to keep as it used to be, because we cannot use `Listener` while calling `receive `or `receiveAsync`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352#discussion_r622301098



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -412,25 +415,17 @@ public UnAckedMessageTracker getUnAckedMessageTracker() {
     protected CompletableFuture<Message<T>> internalReceiveAsync() {
         CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
         CompletableFuture<Message<T>> result = cancellationHandler.createFuture();
-        Message<T> message = null;
-        lock.writeLock().lock();

Review comment:
       we can drop the `lock` field, as we are not using it any more




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on a change in pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
linlinnn commented on a change in pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352#discussion_r625564953



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1292,17 +1280,15 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv
                 if (possibleToDeadLetter != null) {
                     possibleToDeadLetter.add(message);
                 }
-                lock.readLock().lock();
-                try {
+                internalPinnedExecutor.execute(() -> {
                     if (peekPendingReceive() != null) {
                         notifyPendingReceivedCallback(message, null);
                     } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
                         notifyPendingBatchReceivedCallBack();
                     }
-                } finally {
-                    lock.readLock().unlock();
-                }
-                singleMessagePayload.release();
+                    singleMessagePayload.release();
+                    tryTriggerListener();

Review comment:
       You put `triggerListener` into the [cycle](https://github.com/apache/pulsar/blob/c1517ffbe21f767a1147633f71e0bc0153b5a7db/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1240)
   Maybe have a bad influence when the batchSize is big
   What about just 
   
   ```
   if(){
     // do some thing
   }else{
     // do some thing
   }
   internalPinnedExecutor.execute(() -> {
         triggerListener();
   });
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352#discussion_r620879062



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -957,10 +951,12 @@ private void closeConsumerTasks() {
     }
 
     private void failPendingReceive() {
-        if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {
-            failPendingReceives(this.pendingReceives);
-            failPendingBatchReceives(this.pendingBatchReceives);
-        }
+        internalPinnedExecutor.execute(() -> {
+            if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {

Review comment:
       But this queue is unbounded. If the thread pool is closed, can't throw an exception directly?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie merged pull request #10352: Make Consumer thread safe and lock-free

Posted by GitBox <gi...@apache.org>.
sijie merged pull request #10352:
URL: https://github.com/apache/pulsar/pull/10352


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org