You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/05/12 06:48:16 UTC

[GitHub] [pulsar] devinbost edited a comment on issue #6054: Catastrophic frequent random subscription freezes, especially on high-traffic topics.

devinbost edited a comment on issue #6054:
URL: https://github.com/apache/pulsar/issues/6054#issuecomment-839495126


   I traced the call chain for where ack's should be getting sent, and I filled in some additional steps:
   
   When `ProducerImpl` sends the messages, it builds `newSend` command instances, which get picked up by `ServerCnx.handleSend(..)`, which goes to `Producer.publishMessage(..)`, then `PersistentTopic.publishMessage(..)`
   which calls `asyncAddEntry(headersAndPayload, publishContext)`, which calls `[PersistentTopic].ledger.asyncAddEntry(headersAndPayload, (int) publishContext.getNumberOfMessages(), this, publishContext)`
   which creates the `OpAddEntry` and calls `internalAsyncAddEntry(addOperation)` on a different thread, which adds `OpAddEntry` to `[ManagedLedgerImpl].pendingAddEntries`
   
   From somewhere (it's not clear to me exactly where yet) we call `OpAddEntry.safeRun()`, which polls `pendingAddEntries`, gets the callback on `OpAddEntry` (which is the `PersistentTopic` instance) and calls `[PersistentTopic].addComplete(lastEntry, data.asReadOnly(), ctx)`, which calls `publishContext.completed(..)` on `Producer.MessagePublishContext`, which calls `Producer.ServerCnx.execute(this)` on the `MessagePublishContext`, which calls `MessagePublishContext.run()`, which triggers `Producer.ServerCnx.getCommandSender().sendSendReceiptResponse(..)` [SEND_RECEIPT], which writes a `newSendReceiptCommand` to the channel.
   
   So, I added a lot of logging, and to my great surprise, I'm getting an NPE after adding these debug lines to `OpAddEntry.createOpAddEntry(..)`:
   
   ```
       private static OpAddEntry createOpAddEntry(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
           log.debug("Running OpAddEntry.createOpAddEntry(..)");
           OpAddEntry op = RECYCLER.get();
           log.debug("1. In OpAddEntry.createOpAddEntry, OpAddEntry is {}", op != null ? op.toString() : null);
           op.ml = ml;
           op.ledger = null;
           op.data = data.retain();
           op.dataLength = data.readableBytes();
           op.callback = callback;
           op.ctx = ctx;
           op.addOpCount = ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml);
           op.closeWhenDone = false;
           op.entryId = -1;
           op.startTime = System.nanoTime();
           op.state = State.OPEN;
           ml.mbean.addAddEntrySample(op.dataLength);
           log.debug("2. In OpAddEntry.createOpAddEntry, OpAddEntry is {}", op != null ? op.toString() : null);
           return op;
       }
   ```
   
   Here's the stack trace:
   
   ```
      2021-05-12T06:14:19,569 [pulsar-io-28-32] DEBUG org.apache.bookkeeper.mledger.impl.OpAddEntry - Running OpAddEntry.createOpAddEntry(..)
       2021-05-12T06:14:19,569 [pulsar-io-28-32] WARN  org.apache.pulsar.broker.service.ServerCnx - [/10.20.20.160:37696] Got exception java.lang.NullPointerException
       	at org.apache.bookkeeper.mledger.impl.OpAddEntry.toString(OpAddEntry.java:354)
       	at org.apache.bookkeeper.mledger.impl.OpAddEntry.createOpAddEntry(OpAddEntry.java:100)
       	at org.apache.bookkeeper.mledger.impl.OpAddEntry.create(OpAddEntry.java:81)
       	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncAddEntry(ManagedLedgerImpl.java:689)
       	at org.apache.pulsar.broker.service.persistent.PersistentTopic.asyncAddEntry(PersistentTopic.java:428)
       	at org.apache.pulsar.broker.service.persistent.PersistentTopic.publishMessage(PersistentTopic.java:404)
       	at org.apache.pulsar.broker.service.Producer.publishMessageToTopic(Producer.java:224)
       	at org.apache.pulsar.broker.service.Producer.publishMessage(Producer.java:161)
       	at org.apache.pulsar.broker.service.ServerCnx.handleSend(ServerCnx.java:1372)
       	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:207)
       	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
       	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
       	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
       	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
       	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
       	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
       	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
       	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
       	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
       	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
       	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
       	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
       	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
       	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
       	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
       	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1376)
       	at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1265)
       	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1302)
       	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:508)
       	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:447)
       	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
       	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
       	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
       	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
       	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
       	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
       	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
       	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
       	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
       	at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe$1.run(AbstractEpollChannel.java:425)
       	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
       	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
       	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
       	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
       	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
       	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
       	at java.lang.Thread.run(Thread.java:748)
   ```
   
   The only place we could be throwing an NPE is here:
   `log.debug("1. In OpAddEntry.createOpAddEntry, OpAddEntry is {}", op != null ? op.toString() : null);`
   
   So, `op` isn't null until the null check passes?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org