You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Christopher L. Shannon (JIRA)" <ji...@apache.org> on 2015/06/11 03:09:01 UTC

[jira] [Commented] (AMQ-5668) NPE in kahadb with concurrentStoreAndDispatchTopics when sending MQTT msgs with different QoS

    [ https://issues.apache.org/jira/browse/AMQ-5668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14581313#comment-14581313 ] 

Christopher L. Shannon commented on AMQ-5668:
---------------------------------------------

[~tmielke],

Thanks for the bug report and for your test case.  I took a look at this and there is a race condition.  As you pointed out, the best work around is probably to use concurrentStoreAndDispatchTopics="false" for now.

In detail, the issue lies in the setLastCacheId method in AbstractStoreCursor which you can see in the stack trace.  In the else clause in that method lastCachedIds[ index].getFutureOrSequenceLong() is returning null because it hasn't been set yet.  KahaDB assigns that value when the message is added to the store in a transaction.   However the TopicStorePefetch cursor is calling trackLastCached before KahaDB actually gets around to finishing the transaction and assigning the value, which happens asynchronously.

[~tabish121] and [~gtully], What do you guys thisnk about this?  It's certainly easy enough to add a null check but then the value is never cached in setLastCachedId as it just skips it.  This may not actually be a problem depending on what the lastCacheId is used for.  If I add a null check in, the test case passes but it seems like there could be a larger concurrency issue here that might need to be resolved besides just adding in a check for a null.


> NPE in kahadb with concurrentStoreAndDispatchTopics when sending MQTT msgs with different QoS
> ---------------------------------------------------------------------------------------------
>
>                 Key: AMQ-5668
>                 URL: https://issues.apache.org/jira/browse/AMQ-5668
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, KahaDB, MQTT
>    Affects Versions: 5.11.1
>         Environment: MQTT, KahaDB
>            Reporter: Torsten Mielke
>              Labels: broker, kahadb, mqtt
>         Attachments: AMQ-5668Test.tgz
>
>
> Running KahaDB with concurrentStoreAndDispatchTopics="true" and sending 3 MQTT messages using different QoS values raises 
> {code}
> 2015-03-17 13:27:48,866 WARN ActiveMQ NIO Worker 2 - Failed to send MQTT Publish:
> java.lang.NullPointerException
> 	at org.apache.activemq.broker.region.cursors.AbstractStoreCursor.setLastCachedId(AbstractStoreCursor.java:319)
> 	at org.apache.activemq.broker.region.cursors.AbstractStoreCursor.trackLastCached(AbstractStoreCursor.java:280)
> 	at org.apache.activemq.broker.region.cursors.AbstractStoreCursor.addMessageLast(AbstractStoreCursor.java:213)
> 	at org.apache.activemq.broker.region.cursors.TopicStorePrefetch.addMessageLast(TopicStorePrefetch.java:74)
> 	at org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor.addMessageLast(StoreDurableSubscriberCursor.java:198)
> 	at org.apache.activemq.broker.region.PrefetchSubscription.add(PrefetchSubscription.java:159)
> 	at org.apache.activemq.broker.region.DurableTopicSubscription.add(DurableTopicSubscription.java:274)
> 	at org.apache.activemq.broker.region.policy.SimpleDispatchPolicy.dispatch(SimpleDispatchPolicy.java:48)
> 	at org.apache.activemq.broker.region.Topic.dispatch(Topic.java:717)
> 	at org.apache.activemq.broker.region.Topic.doMessageSend(Topic.java:510)
> 	at org.apache.activemq.broker.region.Topic.send(Topic.java:441)
> 	at org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:419)
> 	at org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:468)
> 	at org.apache.activemq.broker.jmx.ManagedRegionBroker.send(ManagedRegionBroker.java:297)
> 	at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:152)
> 	at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)
> 	at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:307)
> 	at org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:157)
> 	at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:541)
> 	at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:768)
> 	at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:334)
> 	at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:188)
> 	at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)
> 	at org.apache.activemq.transport.mqtt.MQTTInactivityMonitor.onCommand(MQTTInactivityMonitor.java:147)
> 	at org.apache.activemq.transport.mqtt.MQTTTransportFilter.sendToActiveMQ(MQTTTransportFilter.java:106)
> 	at org.apache.activemq.transport.mqtt.MQTTProtocolConverter.sendToActiveMQ(MQTTProtocolConverter.java:173)
> 	at org.apache.activemq.transport.mqtt.MQTTProtocolConverter.onMQTTPublish(MQTTProtocolConverter.java:445)
> 	at org.apache.activemq.transport.mqtt.MQTTProtocolConverter.onMQTTCommand(MQTTProtocolConverter.java:210)
> 	at org.apache.activemq.transport.mqtt.MQTTTransportFilter.onCommand(MQTTTransportFilter.java:94)
> 	at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
> 	at org.apache.activemq.transport.mqtt.MQTTCodec$1.onFrame(MQTTCodec.java:54)
> 	at org.apache.activemq.transport.mqtt.MQTTCodec.processCommand(MQTTCodec.java:79)
> 	at org.apache.activemq.transport.mqtt.MQTTCodec.access$400(MQTTCodec.java:26)
> 	at org.apache.activemq.transport.mqtt.MQTTCodec$4.parse(MQTTCodec.java:194)
> 	at org.apache.activemq.transport.mqtt.MQTTCodec$3.parse(MQTTCodec.java:160)
> 	at org.apache.activemq.transport.mqtt.MQTTCodec$2.parse(MQTTCodec.java:123)
> 	at org.apache.activemq.transport.mqtt.MQTTCodec.parse(MQTTCodec.java:65)
> 	at org.apache.activemq.transport.mqtt.MQTTNIOTransport.serviceRead(MQTTNIOTransport.java:105)
> 	at org.apache.activemq.transport.mqtt.MQTTNIOTransport.access$000(MQTTNIOTransport.java:43)
> 	at org.apache.activemq.transport.mqtt.MQTTNIOTransport$1.onSelect(MQTTNIOTransport.java:66)
> 	at org.apache.activemq.transport.nio.SelectorSelection.onSelect(SelectorSelection.java:97)
> 	at org.apache.activemq.transport.nio.SelectorWorker$1.run(SelectorWorker.java:119)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:744)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)