You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2020/12/24 09:49:37 UTC

[GitHub] [rocketmq] xxd763795151 opened a new issue #2516: [BUG][4.7.1]the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

xxd763795151 opened a new issue #2516:
URL: https://github.com/apache/rocketmq/issues/2516


   **BUG REPORT**
   
   1. Please describe the issue you observed:
   
   When the request of send message resides in the sendThreadPoolQueue too long,  the broker may occur error"[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, code as follow":
   ```
                       final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
                       if (behind >= maxWaitTimeMillsInQueue) {
                           if (blockingQueue.remove(runnable)) {
                               rt.setStopRun(true);
                               rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
                           }
                       }
   ```
   the default value of maxWaitTimeMillsInQueue is 200ms.
   We have set it`s value to 1000ms on the production environment, but, this quesiton still happens occasionally.
   We use rocketmq-exporter+prometheus+grafana monitoring the value of sendThreadPoolQueueHeadWaitTimeMills, however the value always is 0(Occasionally a very high value appears).It is not science!
   
   When I debug the broker`s source code, I found that there are two types of data in the sendThreadPoolQueue.
   > java.util.concurrent.CompletableFuture$UniAccept
   > org.apache.rocketmq.broker.latency.FutureTaskExt
   
   If the header element of sendThreadPoolQueue is  org.apache.rocketmq.broker.latency.FutureTaskExt, will computer the value of sendThreadPoolQueueHeadWaitTimeMills. Otherwise, it return 0. Look at the source code below:
   ```
   // BrokerController.java
       public long headSlowTimeMills(BlockingQueue<Runnable> q) {
           long slowTimeMills = 0;
           final Runnable peek = q.peek();
           if (peek != null) {
               RequestTask rt = BrokerFastFailure.castRunnable(peek);
               slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
           }
   
           if (slowTimeMills < 0) {
               slowTimeMills = 0;
           }
   
           return slowTimeMills;
       }
   ```
   Look at this line of  code : BrokerFastFailure.castRunnable(peek);
   ```
       public static RequestTask castRunnable(final Runnable runnable) {
           try {
               if (runnable instanceof FutureTaskExt) {
                   FutureTaskExt object = (FutureTaskExt) runnable;
                   return (RequestTask) object.getRunnable();
               }
           } catch (Throwable e) {
               log.error(String.format("castRunnable exception, %s", runnable.getClass().getName()), e);
           }
   
           return null;
       }
   
   ```
   
   The data of  java.util.concurrent.CompletableFuture$UniAccept comes from(SendMessageProcessor.java):
   ```   
    @Override
       public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception {
           asyncProcessRequest(ctx, request).thenAcceptAsync(responseCallback::callback, this.brokerController.getSendMessageExecutor());
       }
   ```
   They share the sendThreadPoolQueue.  And the header element of sendThreadPoolQueue is java.util.concurrent.CompletableFuture$UniAccept  most of the time.
   
   
   2. Please tell us about your environment:
   
   linux
   mac os
   rocketmq 4.7.1 release
   
   3. Other information (e.g. detailed explanation, logs, related issues, suggestions how to fix, etc):
   
   How I found these info in the sendThreadPoolQueue.
   Print it. Such as the code below:
   ```
   // BrokerController#headSlowTimeMills(BlockingQueue<Runnable> q)
   ........
           if (q == this.sendThreadPoolQueue) {
               System.out.println("send queue foreach size: " + q.size());
               q.stream().forEach(r -> {
                   long tmpSlowTime = 0l;
                   RequestTask rt = BrokerFastFailure.castRunnable(r);
                   System.out.println(r.getClass());
                   tmpSlowTime = rt == null ? -1 : this.messageStore.now() - rt.getCreateTimestamp();
                   System.out.println(tmpSlowTime);
               });
               //System.out.println("Send queue slow time mills: " + slowTimeMills);
           }
   .......
   ```
   this is print info:
   > send queue foreach size: 4
   > class java.util.concurrent.CompletableFuture$UniAccept
   > -1
   > class org.apache.rocketmq.broker.latency.FutureTaskExt
   > 387
   > class java.util.concurrent.CompletableFuture$UniAccept
   > -1
   > class org.apache.rocketmq.broker.latency.FutureTaskExt
   > 80
   
   And print the stack trace:
   ```
           this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity()) {
               @Override
               public void put(Runnable runnable) throws InterruptedException {
                   System.out.println("queue put: " + runnable.getClass());
                   super.put(runnable);
               }
   
               @Override
               public boolean offer(Runnable runnable) {
                   System.out.println("queue offer: " + runnable.getClass() + ", current thread: " + Thread.currentThread().getName() + ", thread id: " + Thread.currentThread().getId());
                   Throwable throwable = new Throwable();
                   StackTraceElement[] stackTraceElements = throwable.getStackTrace();
                   if (stackTraceElements != null) {
                       Arrays.stream(stackTraceElements).forEach(stackTraceElement -> {
                           System.out.println(stackTraceElement.getClassName() + "#"
                                   + stackTraceElement.getMethodName() + "#" + stackTraceElement.getLineNumber());
                       });
                   }
                   System.out.println("---------------------------end------------------------------");
                   return super.offer(runnable);
               }
   
               @Override
               public boolean offer(Runnable runnable, long timeout, TimeUnit unit) throws InterruptedException {
                   System.out.println("queue timeoutoffer: " + runnable.getClass());
                   return super.offer(runnable, timeout, unit);
               }
           };
   ```
   info as fllow:
   > queue offer: class java.util.concurrent.CompletableFuture$UniAccept, current thread: SendMessageThread_1, thread id: 81
   > org.apache.rocketmq.broker.BrokerController$1#offer#205
   > org.apache.rocketmq.broker.BrokerController$1#offer#195
   > java.util.concurrent.ThreadPoolExecutor#execute#1371
   > java.util.concurrent.CompletableFuture$UniCompletion#claim#543
   > java.util.concurrent.CompletableFuture#uniAccept#667
   > java.util.concurrent.CompletableFuture$UniAccept#tryFire$$$capture#646
   > java.util.concurrent.CompletableFuture$UniAccept#tryFire#-1
   > java.util.concurrent.CompletableFuture#uniAcceptStage#686
   > java.util.concurrent.CompletableFuture#thenAcceptAsync#2019
   > org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncProcessRequest#82
   > org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$1#run#226
   > org.apache.rocketmq.remoting.netty.RequestTask#run#80
   > java.util.concurrent.Executors$RunnableAdapter#call#511
   > java.util.concurrent.FutureTask#run$$$capture#266
   > java.util.concurrent.FutureTask#run#-1
   > java.util.concurrent.ThreadPoolExecutor#runWorker#1149
   > java.util.concurrent.ThreadPoolExecutor$Worker#run#624
   > java.lang.Thread#run#748
   
   > 
   > queue offer: class org.apache.rocketmq.broker.latency.FutureTaskExt, current thread: NettyServerCodecThread_5, thread id: 56
   > org.apache.rocketmq.broker.BrokerController$1#offer#205
   > org.apache.rocketmq.broker.BrokerController$1#offer#195
   > java.util.concurrent.ThreadPoolExecutor#execute#1371
   > java.util.concurrent.AbstractExecutorService#submit#112
   > org.apache.rocketmq.broker.BrokerController$2#submit#304
   > org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand#256
   > org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processMessageReceived#158
   > org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler#channelRead0#420
   > org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler#channelRead0#415


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] vongosling commented on issue #2516: The value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
vongosling commented on issue #2516:
URL: https://github.com/apache/rocketmq/issues/2516#issuecomment-752816036


   Great find~ I would like to recommend you use condition iteration instead of creating another pool thread. There are too many blocking queues and executor service now. It's almost chaotic.~
   
   Looking back on the fail-fast mechanism in the broker end what we've done previously is a little rude. I have been thinking about making optimization here. If you have better ideas, welcome to come up with your idea. For example, you could use better algorithms and data structures in Resilience4j...


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] xxd763795151 commented on issue #2516: The value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
xxd763795151 commented on issue #2516:
URL: https://github.com/apache/rocketmq/issues/2516#issuecomment-752417190


   > > > Maybe we could replace to a separate thread pool when we use thenAcceptAsync @xxd763795151
   > > 
   > > 
   > > Of course, this may be the quickest way, if its scope of influence is small enough.
   > > Think it about another way: modify the [ public long headSlowTimeMills(BlockingQueue q)]method.
   > > Iterate over the queue and find the first element that meets the conditions (or NULL)to calculate the time instead of the [peek()] method( think about the impact of concurrency).
   > 
   > Could you submit a pull request to fix the issue? I will review the code and help you merge.
   
   Ok, I will verify the scheme and give you feedback later. @RongtongJin 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] RongtongJin commented on issue #2516: [4.7.1]the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on issue #2516:
URL: https://github.com/apache/rocketmq/issues/2516#issuecomment-751156068


   Good catch! It is indeed a bug, do you have any ideas to fix it ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] vongosling edited a comment on issue #2516: The value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
vongosling edited a comment on issue #2516:
URL: https://github.com/apache/rocketmq/issues/2516#issuecomment-752816036


   Great find~ I would like to recommend you use condition iteration instead of creating another pool thread. There are too many blocking queues and executor service now. It's almost chaotic.~
   
   Looking back on the fail-fast mechanism in the broker end what we've done previously is a little rude. I have been thinking about making optimization here. If you have better ideas, welcome to come up with your idea. For example, you could use better algorithms and data structures coming from Resilience4j...


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] RongtongJin closed issue #2516: The value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
RongtongJin closed issue #2516:
URL: https://github.com/apache/rocketmq/issues/2516


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] xxd763795151 commented on issue #2516: [4.7.1]the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
xxd763795151 commented on issue #2516:
URL: https://github.com/apache/rocketmq/issues/2516#issuecomment-751481835


   > Maybe we could replace to a separate thread pool when we use thenAcceptAsync @xxd763795151
   
   Of course, this may be the quickest way, if its scope of influence is small enough.
   Think it about another way:  modify the [ public long headSlowTimeMills(BlockingQueue<Runnable> q)]method.
   Iterate over the queue and find the first element that meets the conditions (or NULL)to cacluete the time instead of the [peek()] method( think about the impact of concurrency).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] RongtongJin commented on issue #2516: The value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on issue #2516:
URL: https://github.com/apache/rocketmq/issues/2516#issuecomment-977447850


   Merged


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] RongtongJin commented on issue #2516: [4.7.1]the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on issue #2516:
URL: https://github.com/apache/rocketmq/issues/2516#issuecomment-751657399


   > > Maybe we could replace to a separate thread pool when we use thenAcceptAsync @xxd763795151
   > 
   > Of course, this may be the quickest way, if its scope of influence is small enough.
   > Think it about another way: modify the [ public long headSlowTimeMills(BlockingQueue q)]method.
   > Iterate over the queue and find the first element that meets the conditions (or NULL)to calculate the time instead of the [peek()] method( think about the impact of concurrency).
   
   Could you submit a pull request to fix the issue? I will review the code and help you merge.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] RongtongJin commented on issue #2516: [4.7.1]the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on issue #2516:
URL: https://github.com/apache/rocketmq/issues/2516#issuecomment-751464251


   Maybe we could replace to a separate thread pool when we use thenAcceptAsync @xxd763795151 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org