You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/11/15 13:45:49 UTC

[GitHub] [rocketmq] XiaoyiPeng opened a new pull request #3490: 1.fix(consumer): DefaultMQPushConsumer#setConsumeThreadMax(int consumeThreadMax) has no effect.

XiaoyiPeng opened a new pull request #3490:
URL: https://github.com/apache/rocketmq/pull/3490


   **Make sure set the target branch to `develop`**
   
   ## What is the purpose of the change
   
   1.fix(consumer): `DefaultMQPushConsumer#setConsumeThreadMax(int consumeThreadMax)` has no effect.
   
   2.fix(consumer): Handle RejectedExecutionException that may be encountered when the call `ConsumeMessageConcurrentlyService#submitConsumeRequestLater`
   
   ## Brief changelog
   
   1.Replace unlimited size `this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>()` with bounded queues when creating `ConsumeMessageConcurrentlyService#consumeExecutor`, in order to make `DefaultMQPushConsumer#setConsumeThreadMax(int consumeThreadMax)` to work.
   2. `consumeExecutor.submit(consumeRequest)` may throw an RejectedExecutionException in below code:
   
   ```
   private void submitConsumeRequestLater(final ConsumeRequest consumeRequest) {
   
           this.scheduledExecutorService.schedule(new Runnable() {
   
               @Override
               public void run() {
                   ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);
               }
           }, 5000, TimeUnit.MILLISECONDS);
   } 
   ```
   but the program does not handle it.
   
   ## Verifying this change
   
   XXXX
   
   Follow this checklist to help us incorporate your contribution quickly and easily. Notice, `it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR`.
   
   - [x] Make sure there is a [Github issue](https://github.com/apache/rocketmq/issues) filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue. 
   - [x] Format the pull request title like `[ISSUE #123] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body.
   - [x] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
   - [x] Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in [test module](https://github.com/apache/rocketmq/tree/master/test).
   - [x] Run `mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install -DskipITs` to make sure unit-test pass. Run `mvn clean test-compile failsafe:integration-test`  to make sure integration-test pass.
   - [ ] If this contribution is large, please file an [Apache Individual Contributor License Agreement](http://www.apache.org/licenses/#clas).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng commented on a change in pull request #3490: [ISSUE #3493] DefaultMQPushConsumer#setConsumeThreadMax(int consumeThreadMax) has no effect.

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng commented on a change in pull request #3490:
URL: https://github.com/apache/rocketmq/pull/3490#discussion_r749927046



##########
File path: client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
##########
@@ -61,14 +61,25 @@
     private final ScheduledExecutorService scheduledExecutorService;
     private final ScheduledExecutorService cleanExpireMsgExecutors;
 
+    /**
+     * The default retry delay in milliseconds when submitting a {@link ConsumeRequest} encounters RejectedExecutionException
+     */
+    private static final int DEFAULT_CONSUME_REQUEST_LATER_DELAY_MILLS = 5000;

Review comment:
       Cannot be configured currently, the previous version was hard-coded with a value of 5000.
   Should I make it configurable?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] coveralls edited a comment on pull request #3490: 1.fix(consumer): DefaultMQPushConsumer#setConsumeThreadMax(int consumeThreadMax) has no effect.

Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #3490:
URL: https://github.com/apache/rocketmq/pull/3490#issuecomment-968956917


   
   [![Coverage Status](https://coveralls.io/builds/44286507/badge)](https://coveralls.io/builds/44286507)
   
   Coverage increased (+0.01%) to 55.112% when pulling **7448f9180296239d73a464a7c387e354e301a168 on XiaoyiPeng:push_consumer** into **4b8b307901da19a1e562c883adc1bdaf8e111cfb on apache:develop**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] coveralls edited a comment on pull request #3490: 1.fix(consumer): DefaultMQPushConsumer#setConsumeThreadMax(int consumeThreadMax) has no effect.

Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #3490:
URL: https://github.com/apache/rocketmq/pull/3490#issuecomment-968956917


   
   [![Coverage Status](https://coveralls.io/builds/44286321/badge)](https://coveralls.io/builds/44286321)
   
   Coverage increased (+0.004%) to 55.103% when pulling **249ab0647eacdee55a2bc1d72304c13ad3cfdbfb on XiaoyiPeng:push_consumer** into **4b8b307901da19a1e562c883adc1bdaf8e111cfb on apache:develop**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] coveralls edited a comment on pull request #3490: [ISSUE #3493] DefaultMQPushConsumer#setConsumeThreadMax(int consumeThreadMax) has no effect.

Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #3490:
URL: https://github.com/apache/rocketmq/pull/3490#issuecomment-968956917


   
   [![Coverage Status](https://coveralls.io/builds/44290386/badge)](https://coveralls.io/builds/44290386)
   
   Coverage decreased (-0.02%) to 55.075% when pulling **2957d75d390b8e614b73ac2946b6d832ebcdc7c5 on XiaoyiPeng:push_consumer** into **4b8b307901da19a1e562c883adc1bdaf8e111cfb on apache:develop**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Git-Yang commented on a change in pull request #3490: [ISSUE #3493] DefaultMQPushConsumer#setConsumeThreadMax(int consumeThreadMax) has no effect.

Posted by GitBox <gi...@apache.org>.
Git-Yang commented on a change in pull request #3490:
URL: https://github.com/apache/rocketmq/pull/3490#discussion_r749945537



##########
File path: client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
##########
@@ -190,7 +201,7 @@ public void submitConsumeRequest(
             try {
                 this.consumeExecutor.submit(consumeRequest);
             } catch (RejectedExecutionException e) {
-                this.submitConsumeRequestLater(consumeRequest);
+                this.submitConsumeRequestLater(consumeRequest, DEFAULT_CONSUME_REQUEST_LATER_DELAY_MILLS);

Review comment:
       After the consumeRequestQueue is full, it will still be placed in thread pool of scheduledExecutorService, which I don't think is of much significance.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] coveralls commented on pull request #3490: 1.fix(consumer): DefaultMQPushConsumer#setConsumeThreadMax(int consumeThreadMax) has no effect.

Posted by GitBox <gi...@apache.org>.
coveralls commented on pull request #3490:
URL: https://github.com/apache/rocketmq/pull/3490#issuecomment-968956917


   
   [![Coverage Status](https://coveralls.io/builds/44266275/badge)](https://coveralls.io/builds/44266275)
   
   Coverage decreased (-0.03%) to 55.066% when pulling **c6cab7510af2dfc81e05eccbe93f33745e4730a2 on XiaoyiPeng:push_consumer** into **4b8b307901da19a1e562c883adc1bdaf8e111cfb on apache:develop**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] duhenglucky commented on a change in pull request #3490: [ISSUE #3493] DefaultMQPushConsumer#setConsumeThreadMax(int consumeThreadMax) has no effect.

Posted by GitBox <gi...@apache.org>.
duhenglucky commented on a change in pull request #3490:
URL: https://github.com/apache/rocketmq/pull/3490#discussion_r749941782



##########
File path: client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
##########
@@ -62,9 +65,7 @@
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.nullable;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;

Review comment:
       Aggregating all imports is not a good practice.

##########
File path: client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
##########
@@ -163,6 +163,11 @@
      */
     private int consumeThreadMax = 20;
 
+    /**
+     * Queue size of consumption request;
+     */
+    private int consumeQueueSize = 10000;

Review comment:
       If we set up a bounded queue, one issue that needs to be considered is how to choose the rejection strategy when the queue is full, because this may cause message loss.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng commented on pull request #3490: [ISSUE #3493] DefaultMQPushConsumer#setConsumeThreadMax(int consumeThreadMax) has no effect.

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng commented on pull request #3490:
URL: https://github.com/apache/rocketmq/pull/3490#issuecomment-969889104


   > Can you create a corresponding issue for this PR?
   
   OK, thanks for reminding me, please see: #3493


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Git-Yang commented on pull request #3490: 1.fix(consumer): DefaultMQPushConsumer#setConsumeThreadMax(int consumeThreadMax) has no effect.

Posted by GitBox <gi...@apache.org>.
Git-Yang commented on pull request #3490:
URL: https://github.com/apache/rocketmq/pull/3490#issuecomment-969808088


   Can you create a corresponding issue for this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng commented on pull request #3490: [ISSUE #3493] DefaultMQPushConsumer#setConsumeThreadMax(int consumeThreadMax) has no effect.

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng commented on pull request #3490:
URL: https://github.com/apache/rocketmq/pull/3490#issuecomment-969887033


   > Can you create a corresponding issue for this PR?
   
   OK, thanks for reminding me, please see: #3493


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng commented on a change in pull request #3490: [ISSUE #3493] DefaultMQPushConsumer#setConsumeThreadMax(int consumeThreadMax) has no effect.

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng commented on a change in pull request #3490:
URL: https://github.com/apache/rocketmq/pull/3490#discussion_r749957843



##########
File path: client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
##########
@@ -62,9 +65,7 @@
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.nullable;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;

Review comment:
       I have corrected it, best regards.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng commented on a change in pull request #3490: [ISSUE #3493] DefaultMQPushConsumer#setConsumeThreadMax(int consumeThreadMax) has no effect.

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng commented on a change in pull request #3490:
URL: https://github.com/apache/rocketmq/pull/3490#discussion_r750280808



##########
File path: client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
##########
@@ -190,7 +201,7 @@ public void submitConsumeRequest(
             try {
                 this.consumeExecutor.submit(consumeRequest);
             } catch (RejectedExecutionException e) {
-                this.submitConsumeRequestLater(consumeRequest);
+                this.submitConsumeRequestLater(consumeRequest, DEFAULT_CONSUME_REQUEST_LATER_DELAY_MILLS);

Review comment:
       Yeah, you are right, just moving the task from LinkedBlockingQueue to DelayedWorkQueue (binary heap based).
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng commented on a change in pull request #3490: [ISSUE #3493] DefaultMQPushConsumer#setConsumeThreadMax(int consumeThreadMax) has no effect.

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng commented on a change in pull request #3490:
URL: https://github.com/apache/rocketmq/pull/3490#discussion_r750280808



##########
File path: client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
##########
@@ -190,7 +201,7 @@ public void submitConsumeRequest(
             try {
                 this.consumeExecutor.submit(consumeRequest);
             } catch (RejectedExecutionException e) {
-                this.submitConsumeRequestLater(consumeRequest);
+                this.submitConsumeRequestLater(consumeRequest, DEFAULT_CONSUME_REQUEST_LATER_DELAY_MILLS);

Review comment:
       Yeah, you are right, just moving the task from LinkedBlockingQueue to DelayedWorkQueue (binary heap based).
   
   However, we need a place to save rejected tasks, otherwise messages will be lost




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng removed a comment on pull request #3490: [ISSUE #3493] DefaultMQPushConsumer#setConsumeThreadMax(int consumeThreadMax) has no effect.

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng removed a comment on pull request #3490:
URL: https://github.com/apache/rocketmq/pull/3490#issuecomment-969889104


   > Can you create a corresponding issue for this PR?
   
   OK, thanks for reminding me, please see: #3493


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Git-Yang commented on a change in pull request #3490: 1.fix(consumer): DefaultMQPushConsumer#setConsumeThreadMax(int consumeThreadMax) has no effect.

Posted by GitBox <gi...@apache.org>.
Git-Yang commented on a change in pull request #3490:
URL: https://github.com/apache/rocketmq/pull/3490#discussion_r749874437



##########
File path: client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
##########
@@ -61,14 +61,25 @@
     private final ScheduledExecutorService scheduledExecutorService;
     private final ScheduledExecutorService cleanExpireMsgExecutors;
 
+    /**
+     * The default retry delay in milliseconds when submitting a {@link ConsumeRequest} encounters RejectedExecutionException
+     */
+    private static final int DEFAULT_CONSUME_REQUEST_LATER_DELAY_MILLS = 5000;

Review comment:
       Can it be configurable?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng commented on a change in pull request #3490: [ISSUE #3493] DefaultMQPushConsumer#setConsumeThreadMax(int consumeThreadMax) has no effect.

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng commented on a change in pull request #3490:
URL: https://github.com/apache/rocketmq/pull/3490#discussion_r749952325



##########
File path: client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
##########
@@ -163,6 +163,11 @@
      */
     private int consumeThreadMax = 20;
 
+    /**
+     * Queue size of consumption request;
+     */
+    private int consumeQueueSize = 10000;

Review comment:
       Thanks for reminding.
   
   The original rejection strategy is good. If the caller catch an RejectedExecutionException, it try to submit again after a delay. and the delay increases exponentially until it reaches a maximum.
   This PR has implementation and unit tests that don't cause messages to be lost.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng commented on a change in pull request #3490: [ISSUE #3493] DefaultMQPushConsumer#setConsumeThreadMax(int consumeThreadMax) has no effect.

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng commented on a change in pull request #3490:
URL: https://github.com/apache/rocketmq/pull/3490#discussion_r749952397



##########
File path: client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
##########
@@ -62,9 +65,7 @@
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.nullable;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;

Review comment:
       Thanks for reminding, it may be automatically added by IDE, I will correct it later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org