You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/11/18 09:50:22 UTC

[GitHub] [rocketmq] XiaoyiPeng opened a new pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

XiaoyiPeng opened a new pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509


   **Make sure set the target branch to `develop`**
   
   ## What is the purpose of the change
   
   fix bug : https://github.com/apache/rocketmq/issues/2516
   
   ## Brief changelog
   1. According to the idea between the original issue author and @RongtongJin , 
   optimizing the method: `BrokerController#headSlowTimeMillss(BlockingQueue<Runnable> q)`
   
   2. Iterate over the `BrokerController#sendThreadPoolQueue` and find the first element that meets the conditions (or NULL)to cacluete the time instead of the [`sendThreadPoolQueue#peek()`] method (think about the impact of concurrency,  `BrokerController#sendThreadPoolQueue` is polled by thread in `BrokerController#sendMessageExecutor`). 
   
   3. However, for monitoring purposes, I think it's better to count the distribution of this value over each time interval, just like `StoreStatsService#putMessageDistributeTime`,  `StoreStatsService#PUT_MESSAGE_ENTIRE_TIME_MAX_DESC` as shown below:
   ```
   private static final String[] PUT_MESSAGE_ENTIRE_TIME_MAX_DESC = new String[] {
           "[<=0ms]", "[0~10ms]", "[10~50ms]", "[50~100ms]", "[100~200ms]", "[200~500ms]", "[500ms~1s]", "[1~2s]", "[2~3s]", "[3~4s]", "[4~5s]", "[5~10s]", "[10s~]",
       };
   ```
   **because RocketMQ-Exporter monitor `sendThreadPoolQueueHeadWaitTimeMills` as a Prometheus `guage` type variable**. 
   
   ## Verifying this change
   ![image](https://user-images.githubusercontent.com/8653312/142388518-93330ef1-d3ce-42d0-acaa-ef35aa500501.png)
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-974750856


   > It is also a time-consuming operation to cast all send requests to RequestTask especially when there are many send requests. I think we need to consider whether it is better to isolate the thread pool instead of optimize headslowtimemills.
   
   Thanks for your review.
   However, 
   1. Java streams are executed lazily in the pipeline, when it finds the first task that meets the condition, it will return instead of cast all send requests to RequestTask.
   2. For such a little optimization(we just need `headslowtimemills` to monitor the load of `BrokerController#sendMessageExecutor`), introduce a new thread pool may be a bit expensive.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-974750856


   > It is also a time-consuming operation to cast all send requests to RequestTask especially when there are many send requests. I think we need to consider whether it is better to isolate the thread pool instead of optimize headslowtimemills.
   
   Thanks for your review.
   However, 
   1. Java streams are executed lazily in the pipeline, when it finds the first task that meets the condition, it will return instead of cast all send requests to RequestTask.
   2. For such a little optimization(we just need `headslowtimemills` to monitor the load of `BrokerController#sendMessageExecutor`), introduce a new thread pool may be a bit expensive.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-975356716


   @duhenglucky , if you have time, please do a review. Thank you very much.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-995773033


   > ![image](https://user-images.githubusercontent.com/33629004/146357727-836e8470-09b1-4fb4-aeb2-d557cf944d2d.png) ![image](https://user-images.githubusercontent.com/33629004/146357766-ba8e7c16-79c2-4db8-80f9-a9dd6ab51559.png) your test case has some differences with @areyouok , that is the point.
   
   OK, That's my fault. I has changed it when I was debugging the code.
   
   Thank you very much!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng edited a comment on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng edited a comment on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-995634452


   Hi, @areyouok  @RongtongJin  @duhenglucky 
   
   I reviewed the given test cases again, the broker will never hangs in your test, @areyouok 
   
   In your test case, the reason you made the judgment:  **broker hangs**, is due to your console println "begin scan, i still alive", but not println "finish scan, i still alive" finally, or vice versa. 
   
   **That's because the IDE console lied to us!**
   **Our naked eyes do not perceive the tiny change in the console !**
   
   You can use `AtomicInteger` to print the times of prints before you print "begin scan, i still alive" and "finish scan, i still alive". The code shown below: 
   
   ```java
   public class TestLinkedBlockingQueue {
       public static void main(String[] args) throws InterruptedException {
           AtomicInteger startScanTimes = new AtomicInteger();
           AtomicInteger finishScanTimes = new AtomicInteger();
           LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<>();
           for (int i = 0; i < 10; i++) {
               new Thread(() -> {
                   while (true) {
                       queue.offer(new Object());
                       queue.remove();
                   }
               }).start();
           }
   
           while (true) {
               System.out.println("begin scan, i still alive, times: " + startScanTimes.incrementAndGet());
               queue.stream().filter(o -> o != null)
                       .findFirst().isPresent();
               Thread.sleep(1000);
               System.out.println("finish scan, i still alive, times: " + finishScanTimes.incrementAndGet());
           }
       }
   }
   ```
   We can run a fatigue test with this test case, it will never hangs and print the given info forever!
   
   As for the **deadlock**, that was my initial misjudgment. In methods of class `LinkedBlockingQueue`, `putLock` and `takeLock` are never acquired simultaneously in a different order.
   
   Meanwhile, the author of class LinkedBlockingQueue is **Doug Lea**, I don't think their test team would be so lax.
   
   best regards!
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng removed a comment on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng removed a comment on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-995780683


   > Hi, @areyouok @RongtongJin @duhenglucky
   > 
   > I reviewed the given test cases again, the broker will never hangs in your test, @areyouok
   > 
   > In your test case, the reason you made the judgment: **broker hangs**, is due to your console println "begin scan, i still alive", but not println "finish scan, i still alive" finally, or vice versa.
   > 
   > **That's because the IDE console lied to us!** **Our naked eyes do not perceive the tiny change in the console !**
   > 
   > You can use `AtomicInteger` to print the times of prints before you print "begin scan, i still alive" and "finish scan, i still alive". The code shown below:
   > 
   > ```java
   > public class TestLinkedBlockingQueue {
   >     public static void main(String[] args) throws InterruptedException {
   >         AtomicInteger startScanTimes = new AtomicInteger();
   >         AtomicInteger finishScanTimes = new AtomicInteger();
   >         LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<>();
   >         for (int i = 0; i < 10; i++) {
   >             new Thread(() -> {
   >                 while (true) {
   >                     queue.offer(new Object());
   >                     queue.remove();
   >                 }
   >             }).start();
   >         }
   > 
   >         while (true) {
   >             System.out.println("begin scan, i still alive, times: " + startScanTimes.incrementAndGet());
   >             queue.stream().filter(o -> o != null)
   >                     .findFirst().isPresent();
   >             Thread.sleep(1000);
   >             System.out.println("finish scan, i still alive, times: " + finishScanTimes.incrementAndGet());
   >         }
   >     }
   > }
   > ```
   > 
   > **We can run a fatigue test with this test case, it will never hangs and print the given info forever!**
   > 
   > As for the **potential deadlock**, that was my initial misjudgment. In methods of class `LinkedBlockingQueue`, `putLock` and `takeLock` are never acquired simultaneously in a different order.
   > 
   > best regards!
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng removed a comment on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng removed a comment on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-975356716


   @duhenglucky , if you have time, please do a review. Thank you very much.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] RongtongJin commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-974773621


   > 
   
   OK, I got it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] RongtongJin commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-991457729


   > This commit cause critical problem. we should rollback it. And I sugguest review PR #3540 again.
   > 
   > @RongtongJin @XiaoyiPeng @duhengforever
   > 
   > The broker hangs in our test:
   > 
   > ```
   > "BrokerControllerScheduledThread1" #30 prio=5 os_prio=31 tid=0x00007fabdb29d800 nid=0x6a03 runnable [0x000070000ae7c000]
   >    java.lang.Thread.State: RUNNABLE
   > 	at java.util.concurrent.LinkedBlockingQueue$LBQSpliterator.tryAdvance(LinkedBlockingQueue.java:950)
   > 	at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
   > 	at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:499)
   > 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:486)
   > 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
   > 	at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152)
   > 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
   > 	at java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464)
   > 	at org.apache.rocketmq.broker.BrokerController.headSlowTimeMills(BrokerController.java:658)
   > 	at org.apache.rocketmq.broker.BrokerController.headSlowTimeMills4SendThreadPoolQueue(BrokerController.java:672)
   > 	at org.apache.rocketmq.broker.BrokerController.printWaterMark(BrokerController.java:688)
   > 	at org.apache.rocketmq.broker.BrokerController$5.run(BrokerController.java:386)
   > 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   > 	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
   > 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
   > 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
   > 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   > 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   > 	at java.lang.Thread.run(Thread.java:748)
   > 
   >    Locked ownable synchronizers:
   > 	- <0x000000078014eeb8> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
   > 	- <0x000000078014ef00> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
   > 	- <0x0000000780d2ba58> (a java.util.concurrent.ThreadPoolExecutor$Worker)
   > 
   > "NettyServerCodecThread_8" #81 prio=5 os_prio=31 tid=0x00007fabdb294000 nid=0x13d03 waiting on condition [0x000070000e215000]
   >    java.lang.Thread.State: WAITING (parking)
   > 	at sun.misc.Unsafe.park(Native Method)
   > 	- parking to wait for  <0x000000078014ef00> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
   > 	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
   > 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
   > 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
   > 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
   > 	at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
   > 	at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
   > 	at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:418)
   > 	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
   > 	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processRequestCommand(NettyRemotingAbstract.java:256)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processMessageReceived(NettyRemotingAbstract.java:158)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:420)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:415)
   > 	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   > 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   > 	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   > 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   > 	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
   > 	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
   > 	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
   > 	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   > 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:411)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:352)
   > 	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   > 	at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
   > 	at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370)
   > 	at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66)
   > 	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
   > 	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
   > 	at java.lang.Thread.run(Thread.java:748)
   > 
   >    Locked ownable synchronizers:
   > 	- None
   > 
   > "NettyServerCodecThread_6" #79 prio=5 os_prio=31 tid=0x00007fabe209d800 nid=0xc503 waiting on condition [0x000070000e00f000]
   >    java.lang.Thread.State: WAITING (parking)
   > 	at sun.misc.Unsafe.park(Native Method)
   > 	- parking to wait for  <0x000000078014ef00> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
   > 	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
   > 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
   > 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
   > 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
   > 	at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
   > 	at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
   > 	at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:418)
   > 	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
   > 	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processRequestCommand(NettyRemotingAbstract.java:256)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processMessageReceived(NettyRemotingAbstract.java:158)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:420)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:415)
   > 	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   > 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   > 	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   > 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   > 	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
   > 	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
   > 	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
   > 	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   > 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:411)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:352)
   > 	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   > 	at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
   > 	at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370)
   > 	at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66)
   > 	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
   > 	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
   > 	at java.lang.Thread.run(Thread.java:748)
   > 
   >    Locked ownable synchronizers:
   > 	- None
   > 
   > ...
   > ```
   > 
   > some simple code to re-produce it:
   > 
   > ```
   > public class TestQueue {
   >     public static void main(String[] args) throws Exception {
   >         LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<>(1000);
   >         for (int i = 0; i < 10; i++) {
   >             new Thread(() -> {
   >                 while (true) {
   >                     queue.offer(new Object());
   >                     queue.remove();
   >                 }
   >             }).start();
   >         }
   >         while (true) {
   >             System.out.println("begin scan, i still alive");
   >             queue.stream()
   >                     .filter(o -> o == null)
   >                     .findFirst()
   >                     .isPresent();
   >             Thread.sleep(100);
   >             System.out.println("finish scan, i still alive");
   >         }
   >     }
   > }
   > ```
   
   Good catch!I will fix it ASAP.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng edited a comment on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng edited a comment on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-995649654


   > This problem will only appear in the Jdk8 environment, please make sure your Java version is 8.
   
   Hi, @heihaozi 
   $ java -version
   ```
   java version "1.8.0_281"
   Java(TM) SE Runtime Environment (build 1.8.0_281-b09)
   Java HotSpot(TM) 64-Bit Server VM (build 25.281-b09, mixed mode)
   ```
   The above is my Java version information. 
   
   Your can run a fatigue test with the test case, or you can give me the related `JDK issue` about the problem? 
   
   Think you very much!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-995649654


   > This problem will only appear in the Jdk8 environment, please make sure your Java version is 8.
   
   $ java -version
   ```
   java version "1.8.0_281"
   Java(TM) SE Runtime Environment (build 1.8.0_281-b09)
   Java HotSpot(TM) 64-Bit Server VM (build 25.281-b09, mixed mode)
   ```
   The above is my Java version information. 
   
   Your can run a fatigue test with the test case, or you can give me the related `JDK issue` about the problem? 
   
   Think you very much!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] shuangchengsun removed a comment on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
shuangchengsun removed a comment on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-995655837


   ![image](https://user-images.githubusercontent.com/33629004/146356947-dd6303c8-ed44-48e0-9a2f-dbc0311bbbb4.png)
   ![image](https://user-images.githubusercontent.com/33629004/146357044-111f65ba-4ceb-4595-9ff2-5b334acd3470.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng edited a comment on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng edited a comment on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-995898568


   > performance
   
   OK,I catch it. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] guyinyou commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
guyinyou commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-992142648


   I think it's not a deadlock problem.
   
   when queue.remove(), will point the next node of the node to be deleted to itself.
   **java.util.concurrent.LinkedBlockingQueue#dequeue**
   ```
       private E dequeue() {
           // assert takeLock.isHeldByCurrentThread();
           // assert head.item == null;
           Node<E> h = head;
           Node<E> first = h.next;
           h.next = h; // help GC
           head = first;
           E x = first.item;
           first.item = null;
           return x;
       }
   ```
   
   when queue.stream(), will fall into an endless loop.
   **LinkedBlockingQueue$LBQSpliterator#tryAdvance()**
   ![image](https://user-images.githubusercontent.com/36399867/145761467-a085c8b9-6a9b-4cb3-9af8-e1127cf67012.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Jason918 commented on a change in pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#discussion_r752839994



##########
File path: broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
##########
@@ -650,10 +652,13 @@ public void protectBroker() {
 
     public long headSlowTimeMills(BlockingQueue<Runnable> q) {
         long slowTimeMills = 0;
-        final Runnable peek = q.peek();
-        if (peek != null) {
-            RequestTask rt = BrokerFastFailure.castRunnable(peek);
-            slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
+        Optional<RequestTask> op = q.stream()

Review comment:
       OK, I see. Nice work




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-973703556


   > Do we have existing unit test for this public method `headSlowTimeMills`?
   
   No, the previous version didn't one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] WJL3333 commented on a change in pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
WJL3333 commented on a change in pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#discussion_r753649255



##########
File path: broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
##########
@@ -650,10 +652,13 @@ public void protectBroker() {
 
     public long headSlowTimeMills(BlockingQueue<Runnable> q) {
         long slowTimeMills = 0;
-        final Runnable peek = q.peek();
-        if (peek != null) {
-            RequestTask rt = BrokerFastFailure.castRunnable(peek);
-            slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
+        Optional<RequestTask> op = q.stream()
+                .map(BrokerFastFailure::castRunnable)
+                .filter(Objects::nonNull)
+                .findFirst();
+        if (op.isPresent()) {

Review comment:
       i wonder if stream is need in this place. null check is much low cost.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng commented on a change in pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng commented on a change in pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#discussion_r752836279



##########
File path: broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
##########
@@ -650,10 +652,13 @@ public void protectBroker() {
 
     public long headSlowTimeMills(BlockingQueue<Runnable> q) {
         long slowTimeMills = 0;
-        final Runnable peek = q.peek();
-        if (peek != null) {
-            RequestTask rt = BrokerFastFailure.castRunnable(peek);
-            slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
+        Optional<RequestTask> op = q.stream()

Review comment:
       Thanks for your review.
   
   It has nothing to do with thread safety, There is to reduce thread contention, `q` is polled by thread in `BrokerController#sendMessageExecutor` and will be peeked by thread in `BrokerController#scheduledExecutorService`  per second as shown below:
   
   ```
   this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                   @Override
                   public void run() {
                       try {
                           BrokerController.this.printWaterMark();
                       } catch (Throwable e) {
                           log.error("printWaterMark error.", e);
                       }
                   }
               }, 10, 1, TimeUnit.SECONDS);
   ```
   You can look at the issue #2516 , get some context info first.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng edited a comment on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng edited a comment on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-995634452


   Hi, @areyouok  @RongtongJin  @duhenglucky 
   
   I reviewed the given test cases again, the broker will never hangs in your test, @areyouok 
   
   In your test case, the reason you made the judgment:  **broker hangs**, is due to your console println "begin scan, i still alive", but not println "finish scan, i still alive" finally, or vice versa. 
   
   **That's because the IDE console lied to us!**
   **Our naked eyes do not perceive the tiny change in the console !**
   
   You can use `AtomicInteger` to print the times of prints before you print "begin scan, i still alive" and "finish scan, i still alive". The code shown below: 
   
   ```java
   public class TestLinkedBlockingQueue {
       public static void main(String[] args) throws InterruptedException {
           AtomicInteger startScanTimes = new AtomicInteger();
           AtomicInteger finishScanTimes = new AtomicInteger();
           LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<>();
           for (int i = 0; i < 10; i++) {
               new Thread(() -> {
                   while (true) {
                       queue.offer(new Object());
                       queue.remove();
                   }
               }).start();
           }
   
           while (true) {
               System.out.println("begin scan, i still alive, times: " + startScanTimes.incrementAndGet());
               queue.stream().filter(o -> o != null)
                       .findFirst().isPresent();
               Thread.sleep(1000);
               System.out.println("finish scan, i still alive, times: " + finishScanTimes.incrementAndGet());
           }
       }
   }
   ```
   **We can run a fatigue test with this test case, it will never hangs and print the given info forever!**
   
   As for the **potential deadlock**, that was my initial misjudgment. In methods of class `LinkedBlockingQueue`, `putLock` and `takeLock` are never acquired simultaneously in a different order.
   
   Meanwhile, the author of class LinkedBlockingQueue is **Doug Lea**, I don't think their test team would be so lax.
   
   best regards!
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] shuangchengsun commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
shuangchengsun commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-995655837


   ![image](https://user-images.githubusercontent.com/33629004/146356947-dd6303c8-ed44-48e0-9a2f-dbc0311bbbb4.png)
   ![image](https://user-images.githubusercontent.com/33629004/146357044-111f65ba-4ceb-4595-9ff2-5b334acd3470.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng edited a comment on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng edited a comment on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-974750856


   > It is also a time-consuming operation to cast all send requests to RequestTask especially when there are many send requests. I think we need to consider whether it is better to isolate the thread pool instead of optimize headslowtimemills.
   
   Thanks for your review.
   However, 
   1. Java streams are executed lazily in the pipeline, when it finds the first task that meets the condition, it will return instead of cast all send requests to RequestTask.
   2. For such a little optimization(we just need `sendThreadPoolQueueHeadWaitTimeMills ` to monitor the load of `BrokerController#sendMessageExecutor`), introduce a new thread pool may be a bit expensive.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng edited a comment on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng edited a comment on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-995634452


   Hi, @areyouok  @RongtongJin  @duhenglucky 
   
   I reviewed the given test cases again, the broker will never hangs in your test, @areyouok 
   
   In your test case, the reason you made the judgment:  **broker hangs**, is due to your console println "begin scan, i still alive", but not println "finish scan, i still alive" finally, or vice versa. 
   
   **That's because the IDE console lied to us!**
   **Our naked eyes do not perceive the tiny change in the console !**
   
   You can use `AtomicInteger` to print the times of prints before you print "begin scan, i still alive" and "finish scan, i still alive". The code shown below: 
   
   ```java
   public class TestLinkedBlockingQueue {
       public static void main(String[] args) throws InterruptedException {
           AtomicInteger startScanTimes = new AtomicInteger();
           AtomicInteger finishScanTimes = new AtomicInteger();
           LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<>();
           for (int i = 0; i < 10; i++) {
               new Thread(() -> {
                   while (true) {
                       queue.offer(new Object());
                       queue.remove();
                   }
               }).start();
           }
   
           while (true) {
               System.out.println("begin scan, i still alive, times: " + startScanTimes.incrementAndGet());
               queue.stream().filter(o -> o != null)
                       .findFirst().isPresent();
               Thread.sleep(1000);
               System.out.println("finish scan, i still alive, times: " + finishScanTimes.incrementAndGet());
           }
       }
   }
   ```
   **We can run a fatigue test with this test case, it will never hangs and print the given info forever!**
   
   
   best regards!
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-973826758


   > > > Do we have existing unit test for this public method `headSlowTimeMills`?
   > > 
   > > 
   > > No, the previous version didn't one.
   > 
   > It would be nice if you can provide a unit test for this method, to make sure it works properly.
   
   I have submitted the unit tests of method `BrokerController#headSlowTimeMills`, please take a look.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] coveralls edited a comment on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-972735351


   
   [![Coverage Status](https://coveralls.io/builds/44397393/badge)](https://coveralls.io/builds/44397393)
   
   Coverage increased (+0.07%) to 55.191% when pulling **045e0c92303ea8397e1c6dd774993bb5fb7c30a5 on XiaoyiPeng:sendThreadPoolQueueHeadWaitTimeMills** into **4bb99e656b1acc99d3a953f84ae0abcb74737af5 on apache:develop**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng commented on a change in pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng commented on a change in pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#discussion_r753731333



##########
File path: broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
##########
@@ -650,10 +652,13 @@ public void protectBroker() {
 
     public long headSlowTimeMills(BlockingQueue<Runnable> q) {
         long slowTimeMills = 0;
-        final Runnable peek = q.peek();
-        if (peek != null) {
-            RequestTask rt = BrokerFastFailure.castRunnable(peek);
-            slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
+        Optional<RequestTask> op = q.stream()
+                .map(BrokerFastFailure::castRunnable)
+                .filter(Objects::nonNull)
+                .findFirst();
+        if (op.isPresent()) {

Review comment:
       Thanks for your review.
   
   But I can't catch the point of your question, what's wrong with `stream`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-991684075


   > This commit cause critical problem. we should rollback it. And I sugguest review PR #3540 again.
   > 
   
   Thanks for your review. @areyouok , @RongtongJin 
   
    I know the reason. This is a potential **Lock Ordering Deadlock** caused by `putLock` and `takeLock` of `LinkedBlockingQueue`
   
   The method `offer()` will grab the `LinkedBlockingQueue#putLock` and method `remove()` will grab the `LinkedBlockingQueue#takeLock` .
   
    Meanwhile,  in this PR, stream  operation of `LinkedBlockingQueue` will  invoking `LinkedBlockingQueue#fullyLock()` method in `LinkedBlockingQueue$LBQSpliterator#tryAdvance()` as shown below: 
    
   ```java
    public boolean tryAdvance(Consumer<? super E> action) {
               if (action == null) throw new NullPointerException();
               final LinkedBlockingQueue<E> q = this.queue;
               if (!exhausted) {
                   E e = null;
                   q.fullyLock();
                   try {
                       if (current == null)
                           current = q.head.next;
                       while (current != null) {
                           e = current.item;
                           current = current.next;
                           if (e != null)
                               break;
                       }
                   } finally {
                       q.fullyUnlock();
                   }
                   if (current == null)
                       exhausted = true;
                   if (e != null) {
                       action.accept(e);
                       return true;
                   }
               }
               return false;
           }
   ```
   
   I will fix it!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng commented on a change in pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng commented on a change in pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#discussion_r752836279



##########
File path: broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
##########
@@ -650,10 +652,13 @@ public void protectBroker() {
 
     public long headSlowTimeMills(BlockingQueue<Runnable> q) {
         long slowTimeMills = 0;
-        final Runnable peek = q.peek();
-        if (peek != null) {
-            RequestTask rt = BrokerFastFailure.castRunnable(peek);
-            slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
+        Optional<RequestTask> op = q.stream()

Review comment:
       Thanks for your review.
   
   It has nothing to do with thread safety, There is to reduce thread contention, `q` is polled by thread in `BrokerController#sendMessageExecutor` and will be peeked by thread in `BrokerController#scheduledExecutorService`  per second as shown below, both `poll()` and `peek()` method will grab the lock `(LinkedBlockingQueue#takeLock)
   `
   ```
   this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                   @Override
                   public void run() {
                       try {
                           BrokerController.this.printWaterMark();
                       } catch (Throwable e) {
                           log.error("printWaterMark error.", e);
                       }
                   }
               }, 10, 1, TimeUnit.SECONDS);
   ```
   You can look at the issue #2516 , get some context info first.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] WJL3333 commented on a change in pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
WJL3333 commented on a change in pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#discussion_r753649255



##########
File path: broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
##########
@@ -650,10 +652,13 @@ public void protectBroker() {
 
     public long headSlowTimeMills(BlockingQueue<Runnable> q) {
         long slowTimeMills = 0;
-        final Runnable peek = q.peek();
-        if (peek != null) {
-            RequestTask rt = BrokerFastFailure.castRunnable(peek);
-            slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
+        Optional<RequestTask> op = q.stream()
+                .map(BrokerFastFailure::castRunnable)
+                .filter(Objects::nonNull)
+                .findFirst();
+        if (op.isPresent()) {

Review comment:
       i wonder if stream is need in this place. null check is much low cost.

##########
File path: broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
##########
@@ -650,10 +652,13 @@ public void protectBroker() {
 
     public long headSlowTimeMills(BlockingQueue<Runnable> q) {
         long slowTimeMills = 0;
-        final Runnable peek = q.peek();
-        if (peek != null) {
-            RequestTask rt = BrokerFastFailure.castRunnable(peek);
-            slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
+        Optional<RequestTask> op = q.stream()
+                .map(BrokerFastFailure::castRunnable)
+                .filter(Objects::nonNull)
+                .findFirst();
+        if (op.isPresent()) {

Review comment:
       my bad. the code is right after i check the discussion. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] areyouok commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
areyouok commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-991437424


   This commit cause critical problem. we should rollback it. And I sugguest review PR #3540 again.
   
   @RongtongJin @XiaoyiPeng @duhengforever 
   
   The broker hangs in our test:
   ```
   "BrokerControllerScheduledThread1" #30 prio=5 os_prio=31 tid=0x00007fabdb29d800 nid=0x6a03 runnable [0x000070000ae7c000]
      java.lang.Thread.State: RUNNABLE
   	at java.util.concurrent.LinkedBlockingQueue$LBQSpliterator.tryAdvance(LinkedBlockingQueue.java:950)
   	at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
   	at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:499)
   	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:486)
   	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
   	at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152)
   	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
   	at java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464)
   	at org.apache.rocketmq.broker.BrokerController.headSlowTimeMills(BrokerController.java:658)
   	at org.apache.rocketmq.broker.BrokerController.headSlowTimeMills4SendThreadPoolQueue(BrokerController.java:672)
   	at org.apache.rocketmq.broker.BrokerController.printWaterMark(BrokerController.java:688)
   	at org.apache.rocketmq.broker.BrokerController$5.run(BrokerController.java:386)
   	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
   	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
   	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:748)
   
      Locked ownable synchronizers:
   	- <0x000000078014eeb8> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
   	- <0x000000078014ef00> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
   	- <0x0000000780d2ba58> (a java.util.concurrent.ThreadPoolExecutor$Worker)
   
   "NettyServerCodecThread_8" #81 prio=5 os_prio=31 tid=0x00007fabdb294000 nid=0x13d03 waiting on condition [0x000070000e215000]
      java.lang.Thread.State: WAITING (parking)
   	at sun.misc.Unsafe.park(Native Method)
   	- parking to wait for  <0x000000078014ef00> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
   	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
   	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
   	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
   	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
   	at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
   	at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
   	at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:418)
   	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
   	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processRequestCommand(NettyRemotingAbstract.java:256)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processMessageReceived(NettyRemotingAbstract.java:158)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:420)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:415)
   	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
   	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
   	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
   	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:411)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:352)
   	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   	at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
   	at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370)
   	at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66)
   	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
   	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
   	at java.lang.Thread.run(Thread.java:748)
   
      Locked ownable synchronizers:
   	- None
   
   "NettyServerCodecThread_6" #79 prio=5 os_prio=31 tid=0x00007fabe209d800 nid=0xc503 waiting on condition [0x000070000e00f000]
      java.lang.Thread.State: WAITING (parking)
   	at sun.misc.Unsafe.park(Native Method)
   	- parking to wait for  <0x000000078014ef00> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
   	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
   	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
   	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
   	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
   	at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
   	at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
   	at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:418)
   	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
   	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processRequestCommand(NettyRemotingAbstract.java:256)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processMessageReceived(NettyRemotingAbstract.java:158)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:420)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:415)
   	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
   	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
   	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
   	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:411)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:352)
   	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   	at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
   	at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370)
   	at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66)
   	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
   	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
   	at java.lang.Thread.run(Thread.java:748)
   
      Locked ownable synchronizers:
   	- None
   
   ...
   ```
   
   
   some simple code to re-produce it:
   ```
   public class TestQueue {
       public static void main(String[] args) throws Exception {
           LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<>(1000);
           for (int i = 0; i < 10; i++) {
               new Thread(() -> {
                   while (true) {
                       queue.offer(new Object());
                       queue.remove();
                   }
               }).start();
           }
           while (true) {
               System.out.println("begin scan, i still alive");
               queue.stream()
                       .filter(o -> o == null)
                       .findFirst()
                       .isPresent();
               Thread.sleep(100);
               System.out.println("finish scan, i still alive");
           }
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng edited a comment on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng edited a comment on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-995779159


   > I think it's not a deadlock problem.
   > 
   > when queue.remove(), will point the next node of the node to be deleted to itself. **java.util.concurrent.LinkedBlockingQueue#dequeue**
   > 
   > ```
   >     private E dequeue() {
   >         // assert takeLock.isHeldByCurrentThread();
   >         // assert head.item == null;
   >         Node<E> h = head;
   >         Node<E> first = h.next;
   >         h.next = h; // help GC
   >         head = first;
   >         E x = first.item;
   >         first.item = null;
   >         return x;
   >     }
   > ```
   > 
   > when queue.stream(), will fall into an endless loop. **LinkedBlockingQueue$LBQSpliterator#tryAdvance()** ![image](https://user-images.githubusercontent.com/36399867/145761467-a085c8b9-6a9b-4cb3-9af8-e1127cf67012.png)
   
   Thanks for your review, gentleman, you are right!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng edited a comment on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng edited a comment on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-995634452


   Hi, @areyouok  @RongtongJin  @duhenglucky 
   
   I reviewed the given test cases again, the broker will never hangs in your test, @areyouok 
   
   In your test case, the reason you made the judgment:  **broker hangs**, is due to your console println "begin scan, i still alive", but not println "finish scan, i still alive" finally, or vice versa. 
   
   **That's because the IDE console lied to us!**
   **Our naked eyes do not perceive the tiny change in the console !**
   
   You can use `AtomicInteger` to print the times of prints before you print "begin scan, i still alive" and "finish scan, i still alive". The code shown below: 
   
   ```java
   public class TestLinkedBlockingQueue {
       public static void main(String[] args) throws InterruptedException {
           AtomicInteger startScanTimes = new AtomicInteger();
           AtomicInteger finishScanTimes = new AtomicInteger();
           LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<>();
           for (int i = 0; i < 10; i++) {
               new Thread(() -> {
                   while (true) {
                       queue.offer(new Object());
                       queue.remove();
                   }
               }).start();
           }
   
           while (true) {
               System.out.println("begin scan, i still alive, times: " + startScanTimes.incrementAndGet());
               queue.stream().filter(o -> o != null)
                       .findFirst().isPresent();
               Thread.sleep(1000);
               System.out.println("finish scan, i still alive, times: " + finishScanTimes.incrementAndGet());
           }
       }
   }
   ```
   **We can run a fatigue test with this test case, it will never hangs and print the given info forever!**
   
   As for the **deadlock**, that was my initial misjudgment. In methods of class `LinkedBlockingQueue`, `putLock` and `takeLock` are never acquired simultaneously in a different order.
   
   Meanwhile, the author of class LinkedBlockingQueue is **Doug Lea**, I don't think their test team would be so lax.
   
   best regards!
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-995779159


   > I think it's not a deadlock problem.
   > 
   > when queue.remove(), will point the next node of the node to be deleted to itself. **java.util.concurrent.LinkedBlockingQueue#dequeue**
   > 
   > ```
   >     private E dequeue() {
   >         // assert takeLock.isHeldByCurrentThread();
   >         // assert head.item == null;
   >         Node<E> h = head;
   >         Node<E> first = h.next;
   >         h.next = h; // help GC
   >         head = first;
   >         E x = first.item;
   >         first.item = null;
   >         return x;
   >     }
   > ```
   > 
   > when queue.stream(), will fall into an endless loop. **LinkedBlockingQueue$LBQSpliterator#tryAdvance()** ![image](https://user-images.githubusercontent.com/36399867/145761467-a085c8b9-6a9b-4cb3-9af8-e1127cf67012.png)
   
   Thanks for your review, gentleman, your are right!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] areyouok commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
areyouok commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-995890680


   @XiaoyiPeng 
   
   We found this problem in our performance test cluster. 
   
   Java 11 fix it. Although this issue caused by bug of JDK, we should avoid it because most production level RocketMQ broker run under Java 8.
   
   And I developed #3638 to improve stat performance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-995898568


   > performance
   
   OK,I catch it,Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-995634452


   Hi, @areyouok  @RongtongJin  @duhenglucky 
   
   I reviewed the given test cases again, the broker will never hangs in your test, @areyouok 
   
   In your test case, the reason you made the judgment:  **broker hangs**, is due to your console println "begin scan, i still alive", but not println "finish scan, i still alive", or vice versa. 
   
   **That's because the IDE console lied to us!**
   **Our naked eyes do not perceive the tiny change in the console !**
   
   You can use `AtomicInteger` to print the times of prints before you print "begin scan, i still alive" and "finish scan, i still alive". The code shown below: 
   
   ```java
   public class TestLinkedBlockingQueue {
       public static void main(String[] args) throws InterruptedException {
           AtomicInteger startScanTimes = new AtomicInteger();
           AtomicInteger finishScanTimes = new AtomicInteger();
           LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<>();
           for (int i = 0; i < 10; i++) {
               new Thread(() -> {
                   while (true) {
                       queue.offer(new Object());
                       queue.remove();
                   }
               }).start();
           }
   
           while (true) {
               System.out.println("begin scan, i still alive, times: " + startScanTimes.incrementAndGet());
               queue.stream().filter(o -> o != null)
                       .findFirst().isPresent();
               Thread.sleep(1000);
               System.out.println("finish scan, i still alive, times: " + finishScanTimes.incrementAndGet());
           }
       }
   }
   ```
   We can run a fatigue test with this test case, it will never hangs and print the given info forever!
   
   As for the **deadlock**, that was my initial misjudgment. In methods of class `LinkedBlockingQueue`, `putLock` and `takeLock` are never acquired simultaneously in a different order.
   
   Meanwhile, the author of class LinkedBlockingQueue is **Doug Lea**, I don't think their test team would be so lax.
   
   best regards!
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] haozhijie9527 commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
haozhijie9527 commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-995644130


   This problem will only appear in the Jdk8 environment, please make sure your Java version is 8. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Jason918 commented on a change in pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#discussion_r752818988



##########
File path: broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
##########
@@ -650,10 +652,13 @@ public void protectBroker() {
 
     public long headSlowTimeMills(BlockingQueue<Runnable> q) {
         long slowTimeMills = 0;
-        final Runnable peek = q.peek();
-        if (peek != null) {
-            RequestTask rt = BrokerFastFailure.castRunnable(peek);
-            slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
+        Optional<RequestTask> op = q.stream()

Review comment:
       I am a little confusing about this. We are using `java.util.concurrent.LinkedBlockingQueue` for this `q`, and it should be thread safe. Can you give a detailed example for this change to work?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-973709612


   > > Do we have existing unit test for this public method `headSlowTimeMills`?
   > 
   > No, the previous version didn't one.
   
   but I test on my IDE, The occurrences of `(sendThreadPoolQueueHeadWaitTimeMills: 0 )`, were significantly less than the previous implementation of  method `headSlowTimeMills` , see below: 
   
   ![image](https://user-images.githubusercontent.com/8653312/142561899-3fc8f39e-2d5e-45f7-a2f0-76a21e60adc8.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng edited a comment on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng edited a comment on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-974750856


   > It is also a time-consuming operation to cast all send requests to RequestTask especially when there are many send requests. I think we need to consider whether it is better to isolate the thread pool instead of optimize headslowtimemills.
   
   Thanks for your review.
   However, 
   1. Java streams are executed lazily in the pipeline, when it finds the first task that meets the condition, it will return instead of cast all send requests to RequestTask.
   2. For such a little optimization(we just need `sendThreadPoolQueueHeadWaitTimeMills ` to monitor the load of `BrokerController#sendMessageExecutor`), introduce a new thread pool may be a bit expensive.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng commented on a change in pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng commented on a change in pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#discussion_r753731333



##########
File path: broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
##########
@@ -650,10 +652,13 @@ public void protectBroker() {
 
     public long headSlowTimeMills(BlockingQueue<Runnable> q) {
         long slowTimeMills = 0;
-        final Runnable peek = q.peek();
-        if (peek != null) {
-            RequestTask rt = BrokerFastFailure.castRunnable(peek);
-            slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
+        Optional<RequestTask> op = q.stream()
+                .map(BrokerFastFailure::castRunnable)
+                .filter(Objects::nonNull)
+                .findFirst();
+        if (op.isPresent()) {

Review comment:
       Thanks for your review.
   
   But I can't catch the point of your question, what's wrong with `stream`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] coveralls commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
coveralls commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-972735351


   
   [![Coverage Status](https://coveralls.io/builds/44365206/badge)](https://coveralls.io/builds/44365206)
   
   Coverage decreased (-0.005%) to 55.114% when pulling **9347fda71927b9a0009f467ff3a631dcf49852f4 on XiaoyiPeng:sendThreadPoolQueueHeadWaitTimeMills** into **4bb99e656b1acc99d3a953f84ae0abcb74737af5 on apache:develop**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] RongtongJin merged pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
RongtongJin merged pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] shuangchengsun commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
shuangchengsun commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-995730383


   ![image](https://user-images.githubusercontent.com/33629004/146357727-836e8470-09b1-4fb4-aeb2-d557cf944d2d.png)
   ![image](https://user-images.githubusercontent.com/33629004/146357766-ba8e7c16-79c2-4db8-80f9-a9dd6ab51559.png)
   your test case has some differences with @areyouok , that is the point. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] RongtongJin commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-991457729


   > This commit cause critical problem. we should rollback it. And I sugguest review PR #3540 again.
   > 
   > @RongtongJin @XiaoyiPeng @duhengforever
   > 
   > The broker hangs in our test:
   > 
   > ```
   > "BrokerControllerScheduledThread1" #30 prio=5 os_prio=31 tid=0x00007fabdb29d800 nid=0x6a03 runnable [0x000070000ae7c000]
   >    java.lang.Thread.State: RUNNABLE
   > 	at java.util.concurrent.LinkedBlockingQueue$LBQSpliterator.tryAdvance(LinkedBlockingQueue.java:950)
   > 	at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
   > 	at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:499)
   > 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:486)
   > 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
   > 	at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152)
   > 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
   > 	at java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464)
   > 	at org.apache.rocketmq.broker.BrokerController.headSlowTimeMills(BrokerController.java:658)
   > 	at org.apache.rocketmq.broker.BrokerController.headSlowTimeMills4SendThreadPoolQueue(BrokerController.java:672)
   > 	at org.apache.rocketmq.broker.BrokerController.printWaterMark(BrokerController.java:688)
   > 	at org.apache.rocketmq.broker.BrokerController$5.run(BrokerController.java:386)
   > 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   > 	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
   > 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
   > 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
   > 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   > 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   > 	at java.lang.Thread.run(Thread.java:748)
   > 
   >    Locked ownable synchronizers:
   > 	- <0x000000078014eeb8> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
   > 	- <0x000000078014ef00> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
   > 	- <0x0000000780d2ba58> (a java.util.concurrent.ThreadPoolExecutor$Worker)
   > 
   > "NettyServerCodecThread_8" #81 prio=5 os_prio=31 tid=0x00007fabdb294000 nid=0x13d03 waiting on condition [0x000070000e215000]
   >    java.lang.Thread.State: WAITING (parking)
   > 	at sun.misc.Unsafe.park(Native Method)
   > 	- parking to wait for  <0x000000078014ef00> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
   > 	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
   > 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
   > 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
   > 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
   > 	at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
   > 	at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
   > 	at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:418)
   > 	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
   > 	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processRequestCommand(NettyRemotingAbstract.java:256)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processMessageReceived(NettyRemotingAbstract.java:158)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:420)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:415)
   > 	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   > 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   > 	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   > 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   > 	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
   > 	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
   > 	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
   > 	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   > 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:411)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:352)
   > 	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   > 	at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
   > 	at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370)
   > 	at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66)
   > 	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
   > 	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
   > 	at java.lang.Thread.run(Thread.java:748)
   > 
   >    Locked ownable synchronizers:
   > 	- None
   > 
   > "NettyServerCodecThread_6" #79 prio=5 os_prio=31 tid=0x00007fabe209d800 nid=0xc503 waiting on condition [0x000070000e00f000]
   >    java.lang.Thread.State: WAITING (parking)
   > 	at sun.misc.Unsafe.park(Native Method)
   > 	- parking to wait for  <0x000000078014ef00> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
   > 	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
   > 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
   > 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
   > 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
   > 	at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
   > 	at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
   > 	at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:418)
   > 	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
   > 	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processRequestCommand(NettyRemotingAbstract.java:256)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processMessageReceived(NettyRemotingAbstract.java:158)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:420)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:415)
   > 	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   > 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   > 	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   > 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   > 	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
   > 	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
   > 	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
   > 	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   > 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:411)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:352)
   > 	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   > 	at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
   > 	at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370)
   > 	at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66)
   > 	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
   > 	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
   > 	at java.lang.Thread.run(Thread.java:748)
   > 
   >    Locked ownable synchronizers:
   > 	- None
   > 
   > ...
   > ```
   > 
   > some simple code to re-produce it:
   > 
   > ```
   > public class TestQueue {
   >     public static void main(String[] args) throws Exception {
   >         LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<>(1000);
   >         for (int i = 0; i < 10; i++) {
   >             new Thread(() -> {
   >                 while (true) {
   >                     queue.offer(new Object());
   >                     queue.remove();
   >                 }
   >             }).start();
   >         }
   >         while (true) {
   >             System.out.println("begin scan, i still alive");
   >             queue.stream()
   >                     .filter(o -> o == null)
   >                     .findFirst()
   >                     .isPresent();
   >             Thread.sleep(100);
   >             System.out.println("finish scan, i still alive");
   >         }
   >     }
   > }
   > ```
   
   Good catch!I will fix it ASAP.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-991684075


   > This commit cause critical problem. we should rollback it. And I sugguest review PR #3540 again.
   > 
   
   Thanks for your review. @areyouok , @RongtongJin 
   
    I know the reason. This is a potential **Lock Ordering Deadlock** caused by `putLock` and `takeLock` of `LinkedBlockingQueue`
   
   The method `offer()` will grab the `LinkedBlockingQueue#putLock` and method `remove()` will grab the `LinkedBlockingQueue#takeLock` .
   
    Meanwhile,  in this PR, stream  operation of `LinkedBlockingQueue` will  invoking `LinkedBlockingQueue#fullyLock()` method in `LinkedBlockingQueue$LBQSpliterator#tryAdvance()` as shown below: 
    
   ```java
    public boolean tryAdvance(Consumer<? super E> action) {
               if (action == null) throw new NullPointerException();
               final LinkedBlockingQueue<E> q = this.queue;
               if (!exhausted) {
                   E e = null;
                   q.fullyLock();
                   try {
                       if (current == null)
                           current = q.head.next;
                       while (current != null) {
                           e = current.item;
                           current = current.next;
                           if (e != null)
                               break;
                       }
                   } finally {
                       q.fullyUnlock();
                   }
                   if (current == null)
                       exhausted = true;
                   if (e != null) {
                       action.accept(e);
                       return true;
                   }
               }
               return false;
           }
   ```
   
   I will fix it!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] areyouok commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
areyouok commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-991437424


   This commit cause critical problem. we should rollback it. And I sugguest review PR #3540 again.
   
   @RongtongJin @XiaoyiPeng @duhengforever 
   
   The broker hangs in our test:
   ```
   "BrokerControllerScheduledThread1" #30 prio=5 os_prio=31 tid=0x00007fabdb29d800 nid=0x6a03 runnable [0x000070000ae7c000]
      java.lang.Thread.State: RUNNABLE
   	at java.util.concurrent.LinkedBlockingQueue$LBQSpliterator.tryAdvance(LinkedBlockingQueue.java:950)
   	at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
   	at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:499)
   	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:486)
   	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
   	at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152)
   	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
   	at java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464)
   	at org.apache.rocketmq.broker.BrokerController.headSlowTimeMills(BrokerController.java:658)
   	at org.apache.rocketmq.broker.BrokerController.headSlowTimeMills4SendThreadPoolQueue(BrokerController.java:672)
   	at org.apache.rocketmq.broker.BrokerController.printWaterMark(BrokerController.java:688)
   	at org.apache.rocketmq.broker.BrokerController$5.run(BrokerController.java:386)
   	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
   	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
   	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:748)
   
      Locked ownable synchronizers:
   	- <0x000000078014eeb8> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
   	- <0x000000078014ef00> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
   	- <0x0000000780d2ba58> (a java.util.concurrent.ThreadPoolExecutor$Worker)
   
   "NettyServerCodecThread_8" #81 prio=5 os_prio=31 tid=0x00007fabdb294000 nid=0x13d03 waiting on condition [0x000070000e215000]
      java.lang.Thread.State: WAITING (parking)
   	at sun.misc.Unsafe.park(Native Method)
   	- parking to wait for  <0x000000078014ef00> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
   	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
   	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
   	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
   	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
   	at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
   	at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
   	at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:418)
   	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
   	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processRequestCommand(NettyRemotingAbstract.java:256)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processMessageReceived(NettyRemotingAbstract.java:158)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:420)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:415)
   	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
   	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
   	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
   	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:411)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:352)
   	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   	at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
   	at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370)
   	at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66)
   	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
   	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
   	at java.lang.Thread.run(Thread.java:748)
   
      Locked ownable synchronizers:
   	- None
   
   "NettyServerCodecThread_6" #79 prio=5 os_prio=31 tid=0x00007fabe209d800 nid=0xc503 waiting on condition [0x000070000e00f000]
      java.lang.Thread.State: WAITING (parking)
   	at sun.misc.Unsafe.park(Native Method)
   	- parking to wait for  <0x000000078014ef00> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
   	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
   	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
   	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
   	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
   	at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
   	at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
   	at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:418)
   	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
   	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processRequestCommand(NettyRemotingAbstract.java:256)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processMessageReceived(NettyRemotingAbstract.java:158)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:420)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:415)
   	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
   	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
   	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
   	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:411)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:352)
   	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   	at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
   	at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370)
   	at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66)
   	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
   	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
   	at java.lang.Thread.run(Thread.java:748)
   
      Locked ownable synchronizers:
   	- None
   
   ...
   ```
   
   
   some simple code to re-produce it:
   ```
   public class TestQueue {
       public static void main(String[] args) throws Exception {
           LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<>(1000);
           for (int i = 0; i < 10; i++) {
               new Thread(() -> {
                   while (true) {
                       queue.offer(new Object());
                       queue.remove();
                   }
               }).start();
           }
           while (true) {
               System.out.println("begin scan, i still alive");
               queue.stream()
                       .filter(o -> o == null)
                       .findFirst()
                       .isPresent();
               Thread.sleep(100);
               System.out.println("finish scan, i still alive");
           }
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-995780683


   > Hi, @areyouok @RongtongJin @duhenglucky
   > 
   > I reviewed the given test cases again, the broker will never hangs in your test, @areyouok
   > 
   > In your test case, the reason you made the judgment: **broker hangs**, is due to your console println "begin scan, i still alive", but not println "finish scan, i still alive" finally, or vice versa.
   > 
   > **That's because the IDE console lied to us!** **Our naked eyes do not perceive the tiny change in the console !**
   > 
   > You can use `AtomicInteger` to print the times of prints before you print "begin scan, i still alive" and "finish scan, i still alive". The code shown below:
   > 
   > ```java
   > public class TestLinkedBlockingQueue {
   >     public static void main(String[] args) throws InterruptedException {
   >         AtomicInteger startScanTimes = new AtomicInteger();
   >         AtomicInteger finishScanTimes = new AtomicInteger();
   >         LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<>();
   >         for (int i = 0; i < 10; i++) {
   >             new Thread(() -> {
   >                 while (true) {
   >                     queue.offer(new Object());
   >                     queue.remove();
   >                 }
   >             }).start();
   >         }
   > 
   >         while (true) {
   >             System.out.println("begin scan, i still alive, times: " + startScanTimes.incrementAndGet());
   >             queue.stream().filter(o -> o != null)
   >                     .findFirst().isPresent();
   >             Thread.sleep(1000);
   >             System.out.println("finish scan, i still alive, times: " + finishScanTimes.incrementAndGet());
   >         }
   >     }
   > }
   > ```
   > 
   > **We can run a fatigue test with this test case, it will never hangs and print the given info forever!**
   > 
   > As for the **potential deadlock**, that was my initial misjudgment. In methods of class `LinkedBlockingQueue`, `putLock` and `takeLock` are never acquired simultaneously in a different order.
   > 
   > best regards!
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng edited a comment on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng edited a comment on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-995649654


   > This problem will only appear in the Jdk8 environment, please make sure your Java version is 8.
   
   Hi, @heihaozi 
   $ java -version
   ```
   java version "1.8.0_281"
   Java(TM) SE Runtime Environment (build 1.8.0_281-b09)
   Java HotSpot(TM) 64-Bit Server VM (build 25.281-b09, mixed mode)
   ```
   The above is my Java version information. 
   
   Your can run a fatigue test with the test case, or you can give me the related `JDK issue` about the problem? 
   
   Thank you very much!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] XiaoyiPeng commented on a change in pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
XiaoyiPeng commented on a change in pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#discussion_r752836279



##########
File path: broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
##########
@@ -650,10 +652,13 @@ public void protectBroker() {
 
     public long headSlowTimeMills(BlockingQueue<Runnable> q) {
         long slowTimeMills = 0;
-        final Runnable peek = q.peek();
-        if (peek != null) {
-            RequestTask rt = BrokerFastFailure.castRunnable(peek);
-            slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
+        Optional<RequestTask> op = q.stream()

Review comment:
       Thanks for your review.
   
   It has nothing to do with thread safety, There is to reduce thread contention, `q` is polled by thread in `BrokerController#sendMessageExecutor` and will be peeked by thread in `BrokerController#scheduledExecutorService`  per second as shown below, both poll() and peek() will grab the lock `(LinkedBlockingQueue#takeLock)
   `
   ```
   this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                   @Override
                   public void run() {
                       try {
                           BrokerController.this.printWaterMark();
                       } catch (Throwable e) {
                           log.error("printWaterMark error.", e);
                       }
                   }
               }, 10, 1, TimeUnit.SECONDS);
   ```
   You can look at the issue #2516 , get some context info first.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Jason918 commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
Jason918 commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-973702615


   Do we have existing unit test for this public method `headSlowTimeMills`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Jason918 commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
Jason918 commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-973785964


   > > Do we have existing unit test for this public method `headSlowTimeMills`?
   > 
   > No, the previous version didn't one.
   
   It would be nice if you can provide a unit test for this method, to make sure it works properly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] WJL3333 commented on a change in pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
WJL3333 commented on a change in pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#discussion_r753740780



##########
File path: broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
##########
@@ -650,10 +652,13 @@ public void protectBroker() {
 
     public long headSlowTimeMills(BlockingQueue<Runnable> q) {
         long slowTimeMills = 0;
-        final Runnable peek = q.peek();
-        if (peek != null) {
-            RequestTask rt = BrokerFastFailure.castRunnable(peek);
-            slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
+        Optional<RequestTask> op = q.stream()
+                .map(BrokerFastFailure::castRunnable)
+                .filter(Objects::nonNull)
+                .findFirst();
+        if (op.isPresent()) {

Review comment:
       my bad. the code is right after i check the discussion. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] RongtongJin commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-974773621


   > 
   
   OK, I got it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org