You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/07/13 07:00:53 UTC

[GitHub] [rocketmq] shengminw opened a new pull request, #4601: [ISSUE#4599] Optimize enableBackpressureForAsyncMode not relying on NettyRemoting

shengminw opened a new pull request, #4601:
URL: https://github.com/apache/rocketmq/pull/4601

   [issue4599](https://github.com/apache/rocketmq/issues/4599)
   ## What is the purpose of the change
   In the current backpressure mechanism [pull request#4553](https://github.com/apache/rocketmq/pull/4553), the runner task is running directly relying on backpressure in NettyRemoting.
   ```java
   if (this.defaultMQProducer.isEnableBackpressureForAsyncMode()) {
       runnable.run();
   } else {
       try {
           executor.submit(runnable);
       } catch (RejectedExecutionException e) {
           throw new MQClientException("executor rejected ", e);
       }
   }
   ```
   To make a more accurate flow control mechanism, I re-build  a backpressure mechanism based on the numbers of asyncSending  messages threads and  size of the message memory.
   
   ## Brief changelog
   1. Use Semaphore to monitor pressure of sending message.  semaphoreAsyncNum to limit maximum numbers of on-going sending async messages. semaphoreAsyncSize limit maximum message size of on-going sending async messages.
   2. reserve using threadpool in backpressure mechanism.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] shengminw commented on pull request #4601: [ISSUE#4599] Optimize enableBackpressureForAsyncMode not relying on NettyRemoting

Posted by GitBox <gi...@apache.org>.
shengminw commented on PR #4601:
URL: https://github.com/apache/rocketmq/pull/4601#issuecomment-1184105162

   > The back pressure mechanism is a very useful function, but at present, when the traffic of rocketmq is close to the bottleneck, the traffic will drop sharply due to "buzy". I hope to improve this phenomenon. Can you provide the improvement brought by this PR? such as test results
   
   @guyinyou 
   IMO, the current design is indeed close to bottleneck, but the processing of messages is always continuous, so it will tend to a steady state. The number of successfully written messages per second does not change much. Only the message sending rate is reduced. If there is no back pressure, there will still be lots of rejections.
   Due to limited testing equipment, I just ran a simple test on my laptop.
   I set the producer to 4 threads for continuous asynchronous message sending.
   At the same time, I set up a thread to monitor the number of successfully sent messages, the number of timeouts messages, and the number of rejected messages within this 1s.
   
   * When **off enableBackpressureForAsyncMode**:
   ![off-enable](https://user-images.githubusercontent.com/107378995/178924477-1685981b-1836-4371-adb0-4bbbb1ffeec8.png)
   
   * When **on enableBackpressureForAsyncMode**:
   ![on-enable](https://user-images.githubusercontent.com/107378995/178924590-6766e957-d530-44d5-8767-409666a544a5.png)
   
   Although the local test cannot completely simulate the heavy-traffic environment in the server, it can still be seen, that after turning on the backpressure, **the async-sending rate is reduced, but the rate of successful message writing remains unchanged. Besides, a large number of rejections can be avoided.**
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] guyinyou commented on pull request #4601: [ISSUE#4599] Optimize enableBackpressureForAsyncMode not relying on NettyRemoting

Posted by GitBox <gi...@apache.org>.
guyinyou commented on PR #4601:
URL: https://github.com/apache/rocketmq/pull/4601#issuecomment-1184192989

   > 
   
   The effect is still very obvious, well done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] dongeforever commented on a diff in pull request #4601: [ISSUE#4599] Optimize enableBackpressureForAsyncMode not relying on NettyRemoting

Posted by GitBox <gi...@apache.org>.
dongeforever commented on code in PR #4601:
URL: https://github.com/apache/rocketmq/pull/4601#discussion_r924007007


##########
client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java:
##########
@@ -491,27 +506,66 @@ public void send(Message msg,
     public void send(final Message msg, final SendCallback sendCallback, final long timeout)
         throws MQClientException, RemotingException, InterruptedException {
         final long beginStartTime = System.currentTimeMillis();
+        Runnable runnable = new Runnable() {
+            @Override
+            public void run() {
+                long costTime = System.currentTimeMillis() - beginStartTime;
+                if (timeout > costTime) {
+                    try {
+                        sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
+                    } catch (Exception e) {
+                        sendCallback.onException(e);
+                    }
+                } else {
+                    sendCallback.onException(
+                            new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
+                }
+            }
+        };
+        executeAsyncMessageSend(runnable, msg, sendCallback, timeout, beginStartTime);
+    }
+
+    public void executeAsyncMessageSend(Runnable runnable, final Message msg, final SendCallback sendCallback,
+                                         final long timeout, final long beginStartTime)
+            throws MQClientException, InterruptedException {
         ExecutorService executor = this.getAsyncSenderExecutor();
+        boolean isEnableBackpressureForAsyncMode = this.getDefaultMQProducer().isEnableBackpressureForAsyncMode();
+        boolean isSemaphoreAsyncNumAquired = false;
+        boolean isSemaphoreAsyncSizeAquired = false;
+        int msgLen = msg.getBody() == null ? 1 : msg.getBody().length;
+
         try {
-            executor.submit(new Runnable() {
-                @Override
-                public void run() {
-                    long costTime = System.currentTimeMillis() - beginStartTime;
-                    if (timeout > costTime) {
-                        try {
-                            sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
-                        } catch (Exception e) {
-                            sendCallback.onException(e);
-                        }
-                    } else {
-                        sendCallback.onException(
-                            new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
-                    }
+            if (isEnableBackpressureForAsyncMode) {
+                long costTime = System.currentTimeMillis() - beginStartTime;
+                isSemaphoreAsyncNumAquired = semaphoreAsyncSendNum.tryAcquire(timeout - costTime, TimeUnit.MILLISECONDS);
+                if (!isSemaphoreAsyncNumAquired) {
+                    sendCallback.onException(
+                            new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncNum timeout"));
+                    return;
+                }
+                costTime = System.currentTimeMillis() - beginStartTime;
+                isSemaphoreAsyncSizeAquired = semaphoreAsyncSendSize.tryAcquire(msgLen, timeout - costTime, TimeUnit.MILLISECONDS);
+                if (!isSemaphoreAsyncSizeAquired) {
+                    sendCallback.onException(
+                            new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncSize timeout"));
+                    return;
                 }

Review Comment:
   what will happen if timeout - costTime  is negative?



##########
client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java:
##########
@@ -142,6 +147,8 @@ public Thread newThread(Runnable r) {
                     return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
                 }
             });
+        semaphoreAsyncSendNum = new Semaphore(defaultMQProducer.getBackPressureForAsyncSendNum(), true);
+        semaphoreAsyncSendSize = new Semaphore(defaultMQProducer.getBackPressureForAsyncSendSize(), true);
     }

Review Comment:
   Maybe we need to do some checks for illegal arguments.
   
   for example:
   asyncSendNum cannot be smaller than 10;
   asyncSize cannot be smaller than 1M;
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] shengminw commented on pull request #4601: [ISSUE#4599] Optimize enableBackpressureForAsyncMode not relying on NettyRemoting

Posted by GitBox <gi...@apache.org>.
shengminw commented on PR #4601:
URL: https://github.com/apache/rocketmq/pull/4601#issuecomment-1184237038

   @guyinyou It seems that your approach is similar to my implementation, both use the semaphore to control the concurrent number of asynchronous sendings. 
   Your idea about dynamic rateLimiter is a great suggestion. Do you mean a dynamic threshold to control whether to block sending?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] guyinyou commented on pull request #4601: [ISSUE#4599] Optimize enableBackpressureForAsyncMode not relying on NettyRemoting

Posted by GitBox <gi...@apache.org>.
guyinyou commented on PR #4601:
URL: https://github.com/apache/rocketmq/pull/4601#issuecomment-1184206529

   When I benchmarked rocketmq before, there was a need for back pressure. My approach was to maintain a semaphore to control the concurrent number of asynchronous sending, which achieved a very ideal effect. You can try it.
   
   ```
    UniformRateLimiter rateLimiter = new UniformRateLimiter(1024);
    AtomicLong windowsCnt = new AtomicLong(0);
    while(true) {
       try {
           if(Producer.windowsCnt.incrementAndGet() >= windowsSize){
               while (Producer.windowsCnt.get() >= windowsSize) {
                   Thread.yield();
               }
           }
           producer.send(new Message(xxxxxxx), new SendCallback() {
               @Override public void onSuccess(SendResult sendResult) {
                   windowsCnt.decrementAndGet();
               }
   
               @Override public void onException(Throwable e) {
                   windowsCnt.decrementAndGet();
                   throw new RuntimeException(e);
               }
           });
       } catch (Exception e) {
           e.printStackTrace();
       }
    }
   ```
   Of course, this implementation is very rough. In actual scenarios, this rateLimiter should be a dynamic value, which may be increased or decreased according to the number of failed messages sent recently, the delay in sending, etc. If you are interested, you can communicate and discuss together. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] guyinyou commented on pull request #4601: [ISSUE#4599] Optimize enableBackpressureForAsyncMode not relying on NettyRemoting

Posted by GitBox <gi...@apache.org>.
guyinyou commented on PR #4601:
URL: https://github.com/apache/rocketmq/pull/4601#issuecomment-1184319084

   > @guyinyou It seems that your approach is similar to my implementation, both use the semaphore to control the concurrent number of asynchronous sendings. Your idea about dynamic rateLimiter is a great suggestion. Do you mean a dynamic threshold to control whether to block sending?
   
   Yes, and it is enough to only need to send failures and send delays


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] guyinyou commented on pull request #4601: [ISSUE#4599] Optimize enableBackpressureForAsyncMode not relying on NettyRemoting

Posted by GitBox <gi...@apache.org>.
guyinyou commented on PR #4601:
URL: https://github.com/apache/rocketmq/pull/4601#issuecomment-1183921782

   The back pressure mechanism is a very useful function, but at present, when the traffic of rocketmq is close to the bottleneck, the traffic will drop sharply due to "buzy". I hope to improve this phenomenon. Can you provide the improvement brought by this PR? such as test results


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] codecov-commenter commented on pull request #4601: [ISSUE#4599] Optimize enableBackpressureForAsyncMode not relying on NettyRemoting

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #4601:
URL: https://github.com/apache/rocketmq/pull/4601#issuecomment-1186903220

   # [Codecov](https://codecov.io/gh/apache/rocketmq/pull/4601?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#4601](https://codecov.io/gh/apache/rocketmq/pull/4601?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (73a204d) into [develop](https://codecov.io/gh/apache/rocketmq/commit/2f5fd91a6fbd5d92b87bf7c10e0c2ae5ae0357a3?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (2f5fd91) will **decrease** coverage by `4.68%`.
   > The diff coverage is `73.23%`.
   
   > :exclamation: Current head 73a204d differs from pull request most recent head d308031. Consider uploading reports for the commit d308031 to get more accurate results
   
   ```diff
   @@              Coverage Diff              @@
   ##             develop    #4601      +/-   ##
   =============================================
   - Coverage      48.19%   43.50%   -4.69%     
   - Complexity      5128     6227    +1099     
   =============================================
     Files            649      817     +168     
     Lines          43037    57706   +14669     
     Branches        5629     7882    +2253     
   =============================================
   + Hits           20742    25107    +4365     
   - Misses         19788    29360    +9572     
   - Partials        2507     3239     +732     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/rocketmq/pull/4601?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...mq/client/impl/producer/DefaultMQProducerImpl.java](https://codecov.io/gh/apache/rocketmq/pull/4601/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9wcm9kdWNlci9EZWZhdWx0TVFQcm9kdWNlckltcGwuamF2YQ==) | `45.28% <69.49%> (+0.85%)` | :arrow_up: |
   | [...he/rocketmq/client/producer/DefaultMQProducer.java](https://codecov.io/gh/apache/rocketmq/pull/4601/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvcHJvZHVjZXIvRGVmYXVsdE1RUHJvZHVjZXIuamF2YQ==) | `60.53% <91.66%> (+2.44%)` | :arrow_up: |
   | [...main/java/org/apache/rocketmq/store/StoreUtil.java](https://codecov.io/gh/apache/rocketmq/pull/4601/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL1N0b3JlVXRpbC5qYXZh) | `21.42% <0.00%> (-50.00%)` | :arrow_down: |
   | [.../org/apache/rocketmq/common/ThreadFactoryImpl.java](https://codecov.io/gh/apache/rocketmq/pull/4601/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vVGhyZWFkRmFjdG9yeUltcGwuamF2YQ==) | `55.55% <0.00%> (-44.45%)` | :arrow_down: |
   | [...s/command/consumer/ConsumerProgressSubCommand.java](https://codecov.io/gh/apache/rocketmq/pull/4601/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dG9vbHMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3Rvb2xzL2NvbW1hbmQvY29uc3VtZXIvQ29uc3VtZXJQcm9ncmVzc1N1YkNvbW1hbmQuamF2YQ==) | `0.00% <0.00%> (-28.86%)` | :arrow_down: |
   | [...va/org/apache/rocketmq/store/PutMessageResult.java](https://codecov.io/gh/apache/rocketmq/pull/4601/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL1B1dE1lc3NhZ2VSZXN1bHQuamF2YQ==) | `39.13% <0.00%> (-27.54%)` | :arrow_down: |
   | [...che/rocketmq/common/protocol/body/ClusterInfo.java](https://codecov.io/gh/apache/rocketmq/pull/4601/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vcHJvdG9jb2wvYm9keS9DbHVzdGVySW5mby5qYXZh) | `62.50% <0.00%> (-25.74%)` | :arrow_down: |
   | [...org/apache/rocketmq/broker/out/BrokerOuterAPI.java](https://codecov.io/gh/apache/rocketmq/pull/4601/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvb3V0L0Jyb2tlck91dGVyQVBJLmphdmE=) | `21.56% <0.00%> (-24.91%)` | :arrow_down: |
   | [...pache/rocketmq/store/stats/BrokerStatsManager.java](https://codecov.io/gh/apache/rocketmq/pull/4601/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL3N0YXRzL0Jyb2tlclN0YXRzTWFuYWdlci5qYXZh) | `47.83% <0.00%> (-24.04%)` | :arrow_down: |
   | [...ketmq/client/impl/consumer/PullMessageService.java](https://codecov.io/gh/apache/rocketmq/pull/4601/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9jb25zdW1lci9QdWxsTWVzc2FnZVNlcnZpY2UuamF2YQ==) | `49.23% <0.00%> (-21.89%)` | :arrow_down: |
   | ... and [376 more](https://codecov.io/gh/apache/rocketmq/pull/4601/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/rocketmq/pull/4601?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/rocketmq/pull/4601?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [2f5fd91...d308031](https://codecov.io/gh/apache/rocketmq/pull/4601?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] shengminw commented on a diff in pull request #4601: [ISSUE#4599] Optimize enableBackpressureForAsyncMode not relying on NettyRemoting

Posted by GitBox <gi...@apache.org>.
shengminw commented on code in PR #4601:
URL: https://github.com/apache/rocketmq/pull/4601#discussion_r923080145


##########
client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java:
##########
@@ -133,6 +134,21 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
      */
     private TraceDispatcher traceDispatcher = null;
 
+    /**
+     * Indicate whether to block message when asynchronous sending traffic is too heavy.
+     */
+    private boolean enableBackpressureForAsyncMode = false;
+
+    /**
+     * on BackpressureForAsyncMode, limit maximum number of on-going sending async messages
+     */
+    private Semaphore semaphoreAsyncNum = new Semaphore(60000, true);
+

Review Comment:
   It's a good suggestion. I have fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] dongeforever commented on a diff in pull request #4601: [ISSUE#4599] Optimize enableBackpressureForAsyncMode not relying on NettyRemoting

Posted by GitBox <gi...@apache.org>.
dongeforever commented on code in PR #4601:
URL: https://github.com/apache/rocketmq/pull/4601#discussion_r923036244


##########
client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java:
##########
@@ -133,6 +134,21 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
      */
     private TraceDispatcher traceDispatcher = null;
 
+    /**
+     * Indicate whether to block message when asynchronous sending traffic is too heavy.
+     */
+    private boolean enableBackpressureForAsyncMode = false;
+
+    /**
+     * on BackpressureForAsyncMode, limit maximum number of on-going sending async messages
+     */
+    private Semaphore semaphoreAsyncNum = new Semaphore(60000, true);
+

Review Comment:
   expose the integer value instead of Semaphore directly.
   
   It is better to initialize the semaphore in the constructor of DefaultMQProducerImpl



##########
client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java:
##########
@@ -133,6 +134,21 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
      */
     private TraceDispatcher traceDispatcher = null;
 
+    /**
+     * Indicate whether to block message when asynchronous sending traffic is too heavy.
+     */
+    private boolean enableBackpressureForAsyncMode = false;
+
+    /**
+     * on BackpressureForAsyncMode, limit maximum number of on-going sending async messages
+     */
+    private Semaphore semaphoreAsyncNum = new Semaphore(60000, true);
+
+    /**
+     * on BackpressureForAsyncMode, limit maximum message size of on-going sending async messages
+     */
+    private Semaphore semaphoreAsyncSize = new Semaphore(512 * 1024 * 1024, true);
+

Review Comment:
   default to 100M is enough.



##########
client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java:
##########
@@ -133,6 +134,21 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
      */
     private TraceDispatcher traceDispatcher = null;
 
+    /**
+     * Indicate whether to block message when asynchronous sending traffic is too heavy.
+     */
+    private boolean enableBackpressureForAsyncMode = false;
+
+    /**
+     * on BackpressureForAsyncMode, limit maximum number of on-going sending async messages
+     */
+    private Semaphore semaphoreAsyncNum = new Semaphore(60000, true);
+

Review Comment:
   default to 10000 is enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] shengminw commented on a diff in pull request #4601: [ISSUE#4599] Optimize enableBackpressureForAsyncMode not relying on NettyRemoting

Posted by GitBox <gi...@apache.org>.
shengminw commented on code in PR #4601:
URL: https://github.com/apache/rocketmq/pull/4601#discussion_r924113814


##########
client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java:
##########
@@ -491,27 +506,66 @@ public void send(Message msg,
     public void send(final Message msg, final SendCallback sendCallback, final long timeout)
         throws MQClientException, RemotingException, InterruptedException {
         final long beginStartTime = System.currentTimeMillis();
+        Runnable runnable = new Runnable() {
+            @Override
+            public void run() {
+                long costTime = System.currentTimeMillis() - beginStartTime;
+                if (timeout > costTime) {
+                    try {
+                        sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
+                    } catch (Exception e) {
+                        sendCallback.onException(e);
+                    }
+                } else {
+                    sendCallback.onException(
+                            new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
+                }
+            }
+        };
+        executeAsyncMessageSend(runnable, msg, sendCallback, timeout, beginStartTime);
+    }
+
+    public void executeAsyncMessageSend(Runnable runnable, final Message msg, final SendCallback sendCallback,
+                                         final long timeout, final long beginStartTime)
+            throws MQClientException, InterruptedException {
         ExecutorService executor = this.getAsyncSenderExecutor();
+        boolean isEnableBackpressureForAsyncMode = this.getDefaultMQProducer().isEnableBackpressureForAsyncMode();
+        boolean isSemaphoreAsyncNumAquired = false;
+        boolean isSemaphoreAsyncSizeAquired = false;
+        int msgLen = msg.getBody() == null ? 1 : msg.getBody().length;
+
         try {
-            executor.submit(new Runnable() {
-                @Override
-                public void run() {
-                    long costTime = System.currentTimeMillis() - beginStartTime;
-                    if (timeout > costTime) {
-                        try {
-                            sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
-                        } catch (Exception e) {
-                            sendCallback.onException(e);
-                        }
-                    } else {
-                        sendCallback.onException(
-                            new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
-                    }
+            if (isEnableBackpressureForAsyncMode) {
+                long costTime = System.currentTimeMillis() - beginStartTime;
+                isSemaphoreAsyncNumAquired = semaphoreAsyncSendNum.tryAcquire(timeout - costTime, TimeUnit.MILLISECONDS);
+                if (!isSemaphoreAsyncNumAquired) {
+                    sendCallback.onException(
+                            new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncNum timeout"));
+                    return;
+                }
+                costTime = System.currentTimeMillis() - beginStartTime;
+                isSemaphoreAsyncSizeAquired = semaphoreAsyncSendSize.tryAcquire(msgLen, timeout - costTime, TimeUnit.MILLISECONDS);
+                if (!isSemaphoreAsyncSizeAquired) {
+                    sendCallback.onException(
+                            new RemotingTooMuchRequestException("send message tryAcquire semaphoreAsyncSize timeout"));
+                    return;
                 }

Review Comment:
   In tryAquire, if timeout is negative, it will return false. I also added a conditional judgment to ensure that `timeout-costTime` is positive



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] dongeforever merged pull request #4601: [ISSUE#4599] Optimize enableBackpressureForAsyncMode not relying on NettyRemoting

Posted by GitBox <gi...@apache.org>.
dongeforever merged PR #4601:
URL: https://github.com/apache/rocketmq/pull/4601


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] shengminw commented on a diff in pull request #4601: [ISSUE#4599] Optimize enableBackpressureForAsyncMode not relying on NettyRemoting

Posted by GitBox <gi...@apache.org>.
shengminw commented on code in PR #4601:
URL: https://github.com/apache/rocketmq/pull/4601#discussion_r921971740


##########
client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java:
##########
@@ -491,27 +492,68 @@ public void send(Message msg,
     public void send(final Message msg, final SendCallback sendCallback, final long timeout)

Review Comment:
   yes, this method has been marked as `deprecated` for some time, but there is no new method to replace it by now. So I think about to change it first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] Oliverwqcwrw commented on a diff in pull request #4601: [ISSUE#4599] Optimize enableBackpressureForAsyncMode not relying on NettyRemoting

Posted by GitBox <gi...@apache.org>.
Oliverwqcwrw commented on code in PR #4601:
URL: https://github.com/apache/rocketmq/pull/4601#discussion_r921967162


##########
client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java:
##########
@@ -491,27 +492,68 @@ public void send(Message msg,
     public void send(final Message msg, final SendCallback sendCallback, final long timeout)

Review Comment:
   I notice that this method is marked as deprecated. Is there any need to modify this method? It may be removed in the future,WDYT



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] coveralls commented on pull request #4601: [ISSUE#4599] Optimize enableBackpressureForAsyncMode not relying on NettyRemoting

Posted by GitBox <gi...@apache.org>.
coveralls commented on PR #4601:
URL: https://github.com/apache/rocketmq/pull/4601#issuecomment-1183211915

   
   [![Coverage Status](https://coveralls.io/builds/50830024/badge)](https://coveralls.io/builds/50830024)
   
   Coverage decreased (-0.09%) to 47.537% when pulling **84426f77680fb03add8a35b6a6f6c817bce3be3e on shengminw:backpressure-2** into **10326b479af2282310ea0f496f6c08229e4ae126 on apache:develop**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org