You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Christopher L. Shannon (JIRA)" <ji...@apache.org> on 2015/07/02 18:04:04 UTC

[jira] [Updated] (AMQ-5872) MQTT subscribe fails after unsubscribe

     [ https://issues.apache.org/jira/browse/AMQ-5872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Christopher L. Shannon updated AMQ-5872:
----------------------------------------
    Attachment: MQTTNullPointerTest.java

> MQTT subscribe fails after unsubscribe
> --------------------------------------
>
>                 Key: AMQ-5872
>                 URL: https://issues.apache.org/jira/browse/AMQ-5872
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: KahaDB
>    Affects Versions: 5.11.1
>         Environment: Windows server 2012
>            Reporter: Mathia Van De Poel
>         Attachments: MQTTNullPointerTest.java
>
>
> We have a problem subscribing after unsubscribing (same topic). The messages are put in a queue, but not delivered to the client (the queue is created on the server).
> We use .NET M2MQTT as our client library.
> Full scenario:
> 1) Connect to server.
> 2) Subscribe to topic "TEST"
> 3) Publish message "TEST 1" to topic "TEST" => message is received
> 4) Unsubscribe from topic "TEST"
> 5) Subscribe to topic "TEST"
> 6) Publish message "TEST 2" to topic "TEST" => message is not received
> 7) Subscribe to topic "TEST" => message "TEST 2" is received
> 8) Publish message "TEST 3" to topic "TEST" => message is received
> On our server, we see the following error:
> 2015-07-02 10:57:05,621 | WARN  | Error subscribing to TEST | org.apache.activemq.transport.mqtt.strategy.AbstractMQTTSubscriptionStrategy | ActiveMQ Transport: tcp:///10.2.6.86:56778@8884
> java.lang.NullPointerException
> 	at org.apache.activemq.store.kahadb.MessageDatabase.addAckLocationForRetroactiveSub(MessageDatabase.java:2220)[activemq-kahadb-store-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.store.kahadb.MessageDatabase.updateIndex(MessageDatabase.java:1472)[activemq-kahadb-store-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.store.kahadb.MessageDatabase$15.execute(MessageDatabase.java:1207)[activemq-kahadb-store-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.store.kahadb.disk.page.Transaction.execute(Transaction.java:779)[activemq-kahadb-store-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.store.kahadb.MessageDatabase.process(MessageDatabase.java:1204)[activemq-kahadb-store-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.store.kahadb.MessageDatabase$10.visit(MessageDatabase.java:1103)[activemq-kahadb-store-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand.visit(KahaSubscriptionCommand.java:187)[activemq-kahadb-store-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.store.kahadb.MessageDatabase.process(MessageDatabase.java:1070)[activemq-kahadb-store-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.store.kahadb.MessageDatabase.store(MessageDatabase.java:977)[activemq-kahadb-store-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.store.kahadb.MessageDatabase.store(MessageDatabase.java:957)[activemq-kahadb-store-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.store.kahadb.KahaDBStore$KahaDBTopicMessageStore.addSubscription(KahaDBStore.java:796)[activemq-kahadb-store-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.store.ProxyTopicMessageStore.addSubscription(ProxyTopicMessageStore.java:98)[activemq-broker-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.broker.region.Topic.activate(Topic.java:258)[activemq-broker-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.broker.region.DurableTopicSubscription.add(DurableTopicSubscription.java:121)[activemq-broker-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.broker.region.Topic.addSubscription(Topic.java:160)[activemq-broker-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.broker.region.AbstractRegion.addConsumer(AbstractRegion.java:319)[activemq-broker-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.broker.region.TopicRegion.addConsumer(TopicRegion.java:163)[activemq-broker-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:427)[activemq-broker-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.broker.jmx.ManagedRegionBroker.addConsumer(ManagedRegionBroker.java:244)[activemq-broker-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:102)[activemq-broker-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:104)[activemq-broker-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:102)[activemq-broker-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:102)[activemq-broker-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:107)[activemq-broker-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:663)[activemq-broker-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:348)[activemq-client-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:334)[activemq-broker-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:188)[activemq-broker-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)[activemq-client-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.transport.mqtt.MQTTInactivityMonitor.onCommand(MQTTInactivityMonitor.java:147)[activemq-mqtt-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.transport.mqtt.MQTTTransportFilter.sendToActiveMQ(MQTTTransportFilter.java:106)[activemq-mqtt-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.transport.mqtt.MQTTProtocolConverter.sendToActiveMQ(MQTTProtocolConverter.java:173)[activemq-mqtt-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.transport.mqtt.strategy.AbstractMQTTSubscriptionStrategy.doSubscribe(AbstractMQTTSubscriptionStrategy.java:200)[activemq-mqtt-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.transport.mqtt.strategy.MQTTDefaultSubscriptionStrategy.onSubscribe(MQTTDefaultSubscriptionStrategy.java:87)[activemq-mqtt-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.transport.mqtt.strategy.AbstractMQTTSubscriptionStrategy.onSubscribe(AbstractMQTTSubscriptionStrategy.java:108)[activemq-mqtt-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.transport.mqtt.MQTTProtocolConverter.onSubscribe(MQTTProtocolConverter.java:352)[activemq-mqtt-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.transport.mqtt.MQTTProtocolConverter.onMQTTCommand(MQTTProtocolConverter.java:204)[activemq-mqtt-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.transport.mqtt.MQTTTransportFilter.onCommand(MQTTTransportFilter.java:94)[activemq-mqtt-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)[activemq-client-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214)[activemq-client-5.11.1.jar:5.11.1]
> 	at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)[activemq-client-5.11.1.jar:5.11.1]
> 	at java.lang.Thread.run(Unknown Source)[:1.8.0_31]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)