You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/12/11 04:41:22 UTC
[GitHub] [rocketmq] RongtongJin commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time
RongtongJin commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-991457729
> This commit cause critical problem. we should rollback it. And I sugguest review PR #3540 again.
>
> @RongtongJin @XiaoyiPeng @duhengforever
>
> The broker hangs in our test:
>
> ```
> "BrokerControllerScheduledThread1" #30 prio=5 os_prio=31 tid=0x00007fabdb29d800 nid=0x6a03 runnable [0x000070000ae7c000]
> java.lang.Thread.State: RUNNABLE
> at java.util.concurrent.LinkedBlockingQueue$LBQSpliterator.tryAdvance(LinkedBlockingQueue.java:950)
> at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
> at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:499)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:486)
> at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464)
> at org.apache.rocketmq.broker.BrokerController.headSlowTimeMills(BrokerController.java:658)
> at org.apache.rocketmq.broker.BrokerController.headSlowTimeMills4SendThreadPoolQueue(BrokerController.java:672)
> at org.apache.rocketmq.broker.BrokerController.printWaterMark(BrokerController.java:688)
> at org.apache.rocketmq.broker.BrokerController$5.run(BrokerController.java:386)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> Locked ownable synchronizers:
> - <0x000000078014eeb8> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
> - <0x000000078014ef00> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
> - <0x0000000780d2ba58> (a java.util.concurrent.ThreadPoolExecutor$Worker)
>
> "NettyServerCodecThread_8" #81 prio=5 os_prio=31 tid=0x00007fabdb294000 nid=0x13d03 waiting on condition [0x000070000e215000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x000000078014ef00> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
> at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
> at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
> at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:418)
> at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
> at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
> at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processRequestCommand(NettyRemotingAbstract.java:256)
> at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processMessageReceived(NettyRemotingAbstract.java:158)
> at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:420)
> at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:415)
> at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
> at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
> at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
> at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:411)
> at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:352)
> at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
> at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370)
> at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66)
> at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
> at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at java.lang.Thread.run(Thread.java:748)
>
> Locked ownable synchronizers:
> - None
>
> "NettyServerCodecThread_6" #79 prio=5 os_prio=31 tid=0x00007fabe209d800 nid=0xc503 waiting on condition [0x000070000e00f000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x000000078014ef00> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
> at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
> at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
> at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:418)
> at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
> at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
> at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processRequestCommand(NettyRemotingAbstract.java:256)
> at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processMessageReceived(NettyRemotingAbstract.java:158)
> at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:420)
> at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:415)
> at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
> at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
> at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
> at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:411)
> at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:352)
> at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
> at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370)
> at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66)
> at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
> at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at java.lang.Thread.run(Thread.java:748)
>
> Locked ownable synchronizers:
> - None
>
> ...
> ```
>
> some simple code to re-produce it:
>
> ```
> public class TestQueue {
> public static void main(String[] args) throws Exception {
> LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<>(1000);
> for (int i = 0; i < 10; i++) {
> new Thread(() -> {
> while (true) {
> queue.offer(new Object());
> queue.remove();
> }
> }).start();
> }
> while (true) {
> System.out.println("begin scan, i still alive");
> queue.stream()
> .filter(o -> o == null)
> .findFirst()
> .isPresent();
> Thread.sleep(100);
> System.out.println("finish scan, i still alive");
> }
> }
> }
> ```
Good catch!I will fix it ASAP.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org