You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by "Keith Wall (JIRA)" <ji...@apache.org> on 2018/06/20 07:10:00 UTC

[jira] [Resolved] (QPID-8210) [Broker-J] Consumer attach when message is being released and requeued can end-up in broker crash due to unhandled NullPointerException

     [ https://issues.apache.org/jira/browse/QPID-8210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Keith Wall resolved QPID-8210.
------------------------------
    Resolution: Fixed

> [Broker-J]  Consumer attach when message is being released and requeued can end-up in broker crash due to unhandled NullPointerException
> ----------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: QPID-8210
>                 URL: https://issues.apache.org/jira/browse/QPID-8210
>             Project: Qpid
>          Issue Type: Bug
>          Components: Broker-J
>    Affects Versions: qpid-java-broker-7.0.3, qpid-java-broker-7.0.2, qpid-java-broker-7.0.0, qpid-java-broker-7.0.1, qpid-java-broker-7.0.4, qpid-java-broker-7.0.5
>            Reporter: Alex Rudyy
>            Priority: Critical
>             Fix For: qpid-java-broker-7.0.6
>
>
> The consumer node is currently set after consumer is added into queue consumer manager. When released message is requeued all consumers in consumer manager are iterated and AbstractQueue#updateSubRequeueEntry is invoke for every of them to update the released entry in consumer queue context if required. The notification of consumer without a consumer node can result in NullPointerException:
> {noformat}
> java.lang.NullPointerException
>     at org.apache.qpid.server.queue.QueueConsumerManagerImpl.setNotified(QueueConsumerManagerImpl.java:151)
>     at org.apache.qpid.server.queue.AbstractQueue.notifyConsumer(AbstractQueue.java:2268)
>     at org.apache.qpid.server.queue.AbstractQueue.updateSubRequeueEntry(AbstractQueue.java:1382)
>     at org.apache.qpid.server.queue.AbstractQueue.resetSubPointers(AbstractQueue.java:1413)
>     at org.apache.qpid.server.queue.AbstractQueue.requeue(AbstractQueue.java:1399)
>     at org.apache.qpid.server.queue.QueueEntryImpl.postRelease(QueueEntryImpl.java:446)
>     at org.apache.qpid.server.queue.QueueEntryImpl.release(QueueEntryImpl.java:437)
>     at org.apache.qpid.server.protocol.v0_8.AMQChannel.requeue(AMQChannel.java:896)
>     at org.apache.qpid.server.protocol.v0_8.AMQChannel.close(AMQChannel.java:787)
>     at org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8Impl.closeChannel(AMQPConnection_0_8Impl.java:459)
>     at org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8Impl.closeChannel(AMQPConnection_0_8Impl.java:430)
>     at org.apache.qpid.server.protocol.v0_8.AMQChannel.receiveChannelClose(AMQChannel.java:2167)
>     at org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseBody.process(ChannelCloseBody.java:140)
>     at org.apache.qpid.server.protocol.v0_8.ServerDecoder.processMethod(ServerDecoder.java:132)
>     at org.apache.qpid.server.protocol.v0_8.AMQDecoder.processFrame(AMQDecoder.java:203)
>     at org.apache.qpid.server.protocol.v0_8.BrokerDecoder.doProcessFrame(BrokerDecoder.java:141)
>     at org.apache.qpid.server.protocol.v0_8.BrokerDecoder.processFrame(BrokerDecoder.java:65)
>     at org.apache.qpid.server.protocol.v0_8.AMQDecoder.processInput(AMQDecoder.java:185)
>     at org.apache.qpid.server.protocol.v0_8.BrokerDecoder$1.run(BrokerDecoder.java:104)
>     at org.apache.qpid.server.protocol.v0_8.BrokerDecoder$1.run(BrokerDecoder.java:97)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at org.apache.qpid.server.protocol.v0_8.BrokerDecoder.processAMQPFrames(BrokerDecoder.java:96)
>     at org.apache.qpid.server.protocol.v0_8.AMQDecoder.decode(AMQDecoder.java:118)
>     at org.apache.qpid.server.protocol.v0_8.ServerDecoder.decodeBuffer(ServerDecoder.java:44)
>     at org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8Impl$1.run(AMQPConnection_0_8Impl.java:257)
>     at org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8Impl$1.run(AMQPConnection_0_8Impl.java:249)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8Impl.received(AMQPConnection_0_8Impl.java:248)
>     at org.apache.qpid.server.transport.MultiVersionProtocolEngine.received(MultiVersionProtocolEngine.java:135)
>     at org.apache.qpid.server.transport.NonBlockingConnection.processAmqpData(NonBlockingConnection.java:610)
>     at org.apache.qpid.server.transport.NonBlockingConnectionPlainDelegate.processData(NonBlockingConnectionPlainDelegate.java:58)
>     at org.apache.qpid.server.transport.NonBlockingConnection.doRead(NonBlockingConnection.java:496)
>     at org.apache.qpid.server.transport.NonBlockingConnection.doWork(NonBlockingConnection.java:270)
>     at org.apache.qpid.server.transport.NetworkConnectionScheduler.processConnection(NetworkConnectionScheduler.java:134)
>     at org.apache.qpid.server.transport.SelectorThread$ConnectionProcessor.processConnection(SelectorThread.java:575)
>     at org.apache.qpid.server.transport.SelectorThread$SelectionTask.performSelect(SelectorThread.java:366)
>     at org.apache.qpid.server.transport.SelectorThread$SelectionTask.run(SelectorThread.java:97)
>     at org.apache.qpid.server.transport.SelectorThread.run(SelectorThread.java:533)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at org.apache.qpid.server.bytebuffer.QpidByteBufferFactory.lambda$null$0(QpidByteBufferFactory.java:464)
>     at java.lang.Thread.run(Thread.java:748)
> {noformat}
> The issue can happen in the following scenarios:
>  * when previously acquired message is released due to consumer/session/connection close and another consumer is attached to the queue at the same time.
>  * when transaction is rolled back or message is released (for example, JMS session Session#recover() is called for client_ack session) and a new competing consumer is attached at the same time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org