You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/12/11 04:41:22 UTC

[GitHub] [rocketmq] RongtongJin commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

RongtongJin commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-991457729


   > This commit cause critical problem. we should rollback it. And I sugguest review PR #3540 again.
   > 
   > @RongtongJin @XiaoyiPeng @duhengforever
   > 
   > The broker hangs in our test:
   > 
   > ```
   > "BrokerControllerScheduledThread1" #30 prio=5 os_prio=31 tid=0x00007fabdb29d800 nid=0x6a03 runnable [0x000070000ae7c000]
   >    java.lang.Thread.State: RUNNABLE
   > 	at java.util.concurrent.LinkedBlockingQueue$LBQSpliterator.tryAdvance(LinkedBlockingQueue.java:950)
   > 	at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
   > 	at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:499)
   > 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:486)
   > 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
   > 	at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152)
   > 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
   > 	at java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464)
   > 	at org.apache.rocketmq.broker.BrokerController.headSlowTimeMills(BrokerController.java:658)
   > 	at org.apache.rocketmq.broker.BrokerController.headSlowTimeMills4SendThreadPoolQueue(BrokerController.java:672)
   > 	at org.apache.rocketmq.broker.BrokerController.printWaterMark(BrokerController.java:688)
   > 	at org.apache.rocketmq.broker.BrokerController$5.run(BrokerController.java:386)
   > 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   > 	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
   > 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
   > 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
   > 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   > 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   > 	at java.lang.Thread.run(Thread.java:748)
   > 
   >    Locked ownable synchronizers:
   > 	- <0x000000078014eeb8> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
   > 	- <0x000000078014ef00> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
   > 	- <0x0000000780d2ba58> (a java.util.concurrent.ThreadPoolExecutor$Worker)
   > 
   > "NettyServerCodecThread_8" #81 prio=5 os_prio=31 tid=0x00007fabdb294000 nid=0x13d03 waiting on condition [0x000070000e215000]
   >    java.lang.Thread.State: WAITING (parking)
   > 	at sun.misc.Unsafe.park(Native Method)
   > 	- parking to wait for  <0x000000078014ef00> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
   > 	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
   > 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
   > 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
   > 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
   > 	at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
   > 	at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
   > 	at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:418)
   > 	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
   > 	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processRequestCommand(NettyRemotingAbstract.java:256)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processMessageReceived(NettyRemotingAbstract.java:158)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:420)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:415)
   > 	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   > 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   > 	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   > 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   > 	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
   > 	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
   > 	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
   > 	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   > 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:411)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:352)
   > 	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   > 	at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
   > 	at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370)
   > 	at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66)
   > 	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
   > 	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
   > 	at java.lang.Thread.run(Thread.java:748)
   > 
   >    Locked ownable synchronizers:
   > 	- None
   > 
   > "NettyServerCodecThread_6" #79 prio=5 os_prio=31 tid=0x00007fabe209d800 nid=0xc503 waiting on condition [0x000070000e00f000]
   >    java.lang.Thread.State: WAITING (parking)
   > 	at sun.misc.Unsafe.park(Native Method)
   > 	- parking to wait for  <0x000000078014ef00> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
   > 	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
   > 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
   > 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
   > 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
   > 	at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
   > 	at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
   > 	at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:418)
   > 	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
   > 	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processRequestCommand(NettyRemotingAbstract.java:256)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processMessageReceived(NettyRemotingAbstract.java:158)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:420)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:415)
   > 	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   > 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   > 	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   > 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   > 	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
   > 	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
   > 	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
   > 	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   > 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:411)
   > 	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:352)
   > 	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
   > 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   > 	at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
   > 	at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370)
   > 	at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66)
   > 	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
   > 	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
   > 	at java.lang.Thread.run(Thread.java:748)
   > 
   >    Locked ownable synchronizers:
   > 	- None
   > 
   > ...
   > ```
   > 
   > some simple code to re-produce it:
   > 
   > ```
   > public class TestQueue {
   >     public static void main(String[] args) throws Exception {
   >         LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<>(1000);
   >         for (int i = 0; i < 10; i++) {
   >             new Thread(() -> {
   >                 while (true) {
   >                     queue.offer(new Object());
   >                     queue.remove();
   >                 }
   >             }).start();
   >         }
   >         while (true) {
   >             System.out.println("begin scan, i still alive");
   >             queue.stream()
   >                     .filter(o -> o == null)
   >                     .findFirst()
   >                     .isPresent();
   >             Thread.sleep(100);
   >             System.out.println("finish scan, i still alive");
   >         }
   >     }
   > }
   > ```
   
   Good catch!I will fix it ASAP.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org