You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/12/11 03:40:37 UTC
[GitHub] [rocketmq] areyouok commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time
areyouok commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-991437424
This commit cause critical problem. we should rollback it. And I sugguest review PR #3540 again.
@RongtongJin @XiaoyiPeng @duhengforever
The broker hangs in our test:
```
"BrokerControllerScheduledThread1" #30 prio=5 os_prio=31 tid=0x00007fabdb29d800 nid=0x6a03 runnable [0x000070000ae7c000]
java.lang.Thread.State: RUNNABLE
at java.util.concurrent.LinkedBlockingQueue$LBQSpliterator.tryAdvance(LinkedBlockingQueue.java:950)
at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:499)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:486)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464)
at org.apache.rocketmq.broker.BrokerController.headSlowTimeMills(BrokerController.java:658)
at org.apache.rocketmq.broker.BrokerController.headSlowTimeMills4SendThreadPoolQueue(BrokerController.java:672)
at org.apache.rocketmq.broker.BrokerController.printWaterMark(BrokerController.java:688)
at org.apache.rocketmq.broker.BrokerController$5.run(BrokerController.java:386)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- <0x000000078014eeb8> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
- <0x000000078014ef00> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
- <0x0000000780d2ba58> (a java.util.concurrent.ThreadPoolExecutor$Worker)
"NettyServerCodecThread_8" #81 prio=5 os_prio=31 tid=0x00007fabdb294000 nid=0x13d03 waiting on condition [0x000070000e215000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000078014ef00> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:418)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processRequestCommand(NettyRemotingAbstract.java:256)
at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processMessageReceived(NettyRemotingAbstract.java:158)
at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:420)
at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:415)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:411)
at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:352)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370)
at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- None
"NettyServerCodecThread_6" #79 prio=5 os_prio=31 tid=0x00007fabe209d800 nid=0xc503 waiting on condition [0x000070000e00f000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000078014ef00> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:418)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processRequestCommand(NettyRemotingAbstract.java:256)
at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processMessageReceived(NettyRemotingAbstract.java:158)
at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:420)
at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:415)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:411)
at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:352)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370)
at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- None
...
```
some simple code to re-produce it:
```
public class TestQueue {
public static void main(String[] args) throws Exception {
LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<>(1000);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
while (true) {
queue.offer(new Object());
queue.remove();
}
}).start();
}
while (true) {
System.out.println("begin scan, i still alive");
queue.stream()
.filter(o -> o == null)
.findFirst()
.isPresent();
Thread.sleep(100);
System.out.println("finish scan, i still alive");
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org