You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/12/11 03:40:37 UTC

[GitHub] [rocketmq] areyouok commented on pull request #3509: [ISSUE #2516]: Fix the value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time

areyouok commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-991437424


   This commit cause critical problem. we should rollback it. And I sugguest review PR #3540 again.
   
   @RongtongJin @XiaoyiPeng @duhengforever 
   
   The broker hangs in our test:
   ```
   "BrokerControllerScheduledThread1" #30 prio=5 os_prio=31 tid=0x00007fabdb29d800 nid=0x6a03 runnable [0x000070000ae7c000]
      java.lang.Thread.State: RUNNABLE
   	at java.util.concurrent.LinkedBlockingQueue$LBQSpliterator.tryAdvance(LinkedBlockingQueue.java:950)
   	at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
   	at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:499)
   	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:486)
   	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
   	at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152)
   	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
   	at java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464)
   	at org.apache.rocketmq.broker.BrokerController.headSlowTimeMills(BrokerController.java:658)
   	at org.apache.rocketmq.broker.BrokerController.headSlowTimeMills4SendThreadPoolQueue(BrokerController.java:672)
   	at org.apache.rocketmq.broker.BrokerController.printWaterMark(BrokerController.java:688)
   	at org.apache.rocketmq.broker.BrokerController$5.run(BrokerController.java:386)
   	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
   	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
   	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:748)
   
      Locked ownable synchronizers:
   	- <0x000000078014eeb8> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
   	- <0x000000078014ef00> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
   	- <0x0000000780d2ba58> (a java.util.concurrent.ThreadPoolExecutor$Worker)
   
   "NettyServerCodecThread_8" #81 prio=5 os_prio=31 tid=0x00007fabdb294000 nid=0x13d03 waiting on condition [0x000070000e215000]
      java.lang.Thread.State: WAITING (parking)
   	at sun.misc.Unsafe.park(Native Method)
   	- parking to wait for  <0x000000078014ef00> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
   	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
   	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
   	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
   	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
   	at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
   	at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
   	at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:418)
   	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
   	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processRequestCommand(NettyRemotingAbstract.java:256)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processMessageReceived(NettyRemotingAbstract.java:158)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:420)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:415)
   	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
   	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
   	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
   	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:411)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:352)
   	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   	at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
   	at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370)
   	at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66)
   	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
   	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
   	at java.lang.Thread.run(Thread.java:748)
   
      Locked ownable synchronizers:
   	- None
   
   "NettyServerCodecThread_6" #79 prio=5 os_prio=31 tid=0x00007fabe209d800 nid=0xc503 waiting on condition [0x000070000e00f000]
      java.lang.Thread.State: WAITING (parking)
   	at sun.misc.Unsafe.park(Native Method)
   	- parking to wait for  <0x000000078014ef00> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
   	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
   	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
   	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
   	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
   	at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
   	at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
   	at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:418)
   	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
   	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processRequestCommand(NettyRemotingAbstract.java:256)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processMessageReceived(NettyRemotingAbstract.java:158)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:420)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:415)
   	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
   	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
   	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
   	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:411)
   	at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:352)
   	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   	at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
   	at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370)
   	at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66)
   	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
   	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
   	at java.lang.Thread.run(Thread.java:748)
   
      Locked ownable synchronizers:
   	- None
   
   ...
   ```
   
   
   some simple code to re-produce it:
   ```
   public class TestQueue {
       public static void main(String[] args) throws Exception {
           LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<>(1000);
           for (int i = 0; i < 10; i++) {
               new Thread(() -> {
                   while (true) {
                       queue.offer(new Object());
                       queue.remove();
                   }
               }).start();
           }
           while (true) {
               System.out.println("begin scan, i still alive");
               queue.stream()
                       .filter(o -> o == null)
                       .findFirst()
                       .isPresent();
               Thread.sleep(100);
               System.out.println("finish scan, i still alive");
           }
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org