You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Mathia Van De Poel (JIRA)" <ji...@apache.org> on 2015/07/02 11:39:04 UTC

[jira] [Created] (AMQ-5872) MQTT subscribe fails after unsubscribe

Mathia Van De Poel created AMQ-5872:
---------------------------------------

             Summary: MQTT subscribe fails after unsubscribe
                 Key: AMQ-5872
                 URL: https://issues.apache.org/jira/browse/AMQ-5872
             Project: ActiveMQ
          Issue Type: Bug
          Components: KahaDB
    Affects Versions: 5.11.1
         Environment: Windows server 2012
            Reporter: Mathia Van De Poel


We have a problem subscribing after unsubscribing (same topic). The messages are put in a queue, but not delivered to the client (the queue is created on the server).

We use .NET M2MQTT as our client library.

Full scenario:

1) Connect to server.
2) Subscribe to topic "TEST"
3) Publish message "TEST 1" to topic "TEST" => message is received
4) Unsubscribe from topic "TEST"
5) Subscribe to topic "TEST"
6) Publish message "TEST 2" to topic "TEST" => message is not received
7) Subscribe to topic "TEST" => message "TEST 2" is received
8) Publish message "TEST 3" to topic "TEST" => message is received

On our server, we see the following error:
2015-07-02 10:57:05,621 | WARN  | Error subscribing to TEST | org.apache.activemq.transport.mqtt.strategy.AbstractMQTTSubscriptionStrategy | ActiveMQ Transport: tcp:///10.2.6.86:56778@8884
java.lang.NullPointerException
	at org.apache.activemq.store.kahadb.MessageDatabase.addAckLocationForRetroactiveSub(MessageDatabase.java:2220)[activemq-kahadb-store-5.11.1.jar:5.11.1]
	at org.apache.activemq.store.kahadb.MessageDatabase.updateIndex(MessageDatabase.java:1472)[activemq-kahadb-store-5.11.1.jar:5.11.1]
	at org.apache.activemq.store.kahadb.MessageDatabase$15.execute(MessageDatabase.java:1207)[activemq-kahadb-store-5.11.1.jar:5.11.1]
	at org.apache.activemq.store.kahadb.disk.page.Transaction.execute(Transaction.java:779)[activemq-kahadb-store-5.11.1.jar:5.11.1]
	at org.apache.activemq.store.kahadb.MessageDatabase.process(MessageDatabase.java:1204)[activemq-kahadb-store-5.11.1.jar:5.11.1]
	at org.apache.activemq.store.kahadb.MessageDatabase$10.visit(MessageDatabase.java:1103)[activemq-kahadb-store-5.11.1.jar:5.11.1]
	at org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand.visit(KahaSubscriptionCommand.java:187)[activemq-kahadb-store-5.11.1.jar:5.11.1]
	at org.apache.activemq.store.kahadb.MessageDatabase.process(MessageDatabase.java:1070)[activemq-kahadb-store-5.11.1.jar:5.11.1]
	at org.apache.activemq.store.kahadb.MessageDatabase.store(MessageDatabase.java:977)[activemq-kahadb-store-5.11.1.jar:5.11.1]
	at org.apache.activemq.store.kahadb.MessageDatabase.store(MessageDatabase.java:957)[activemq-kahadb-store-5.11.1.jar:5.11.1]
	at org.apache.activemq.store.kahadb.KahaDBStore$KahaDBTopicMessageStore.addSubscription(KahaDBStore.java:796)[activemq-kahadb-store-5.11.1.jar:5.11.1]
	at org.apache.activemq.store.ProxyTopicMessageStore.addSubscription(ProxyTopicMessageStore.java:98)[activemq-broker-5.11.1.jar:5.11.1]
	at org.apache.activemq.broker.region.Topic.activate(Topic.java:258)[activemq-broker-5.11.1.jar:5.11.1]
	at org.apache.activemq.broker.region.DurableTopicSubscription.add(DurableTopicSubscription.java:121)[activemq-broker-5.11.1.jar:5.11.1]
	at org.apache.activemq.broker.region.Topic.addSubscription(Topic.java:160)[activemq-broker-5.11.1.jar:5.11.1]
	at org.apache.activemq.broker.region.AbstractRegion.addConsumer(AbstractRegion.java:319)[activemq-broker-5.11.1.jar:5.11.1]
	at org.apache.activemq.broker.region.TopicRegion.addConsumer(TopicRegion.java:163)[activemq-broker-5.11.1.jar:5.11.1]
	at org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:427)[activemq-broker-5.11.1.jar:5.11.1]
	at org.apache.activemq.broker.jmx.ManagedRegionBroker.addConsumer(ManagedRegionBroker.java:244)[activemq-broker-5.11.1.jar:5.11.1]
	at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:102)[activemq-broker-5.11.1.jar:5.11.1]
	at org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:104)[activemq-broker-5.11.1.jar:5.11.1]
	at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:102)[activemq-broker-5.11.1.jar:5.11.1]
	at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:102)[activemq-broker-5.11.1.jar:5.11.1]
	at org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:107)[activemq-broker-5.11.1.jar:5.11.1]
	at org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:663)[activemq-broker-5.11.1.jar:5.11.1]
	at org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:348)[activemq-client-5.11.1.jar:5.11.1]
	at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:334)[activemq-broker-5.11.1.jar:5.11.1]
	at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:188)[activemq-broker-5.11.1.jar:5.11.1]
	at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)[activemq-client-5.11.1.jar:5.11.1]
	at org.apache.activemq.transport.mqtt.MQTTInactivityMonitor.onCommand(MQTTInactivityMonitor.java:147)[activemq-mqtt-5.11.1.jar:5.11.1]
	at org.apache.activemq.transport.mqtt.MQTTTransportFilter.sendToActiveMQ(MQTTTransportFilter.java:106)[activemq-mqtt-5.11.1.jar:5.11.1]
	at org.apache.activemq.transport.mqtt.MQTTProtocolConverter.sendToActiveMQ(MQTTProtocolConverter.java:173)[activemq-mqtt-5.11.1.jar:5.11.1]
	at org.apache.activemq.transport.mqtt.strategy.AbstractMQTTSubscriptionStrategy.doSubscribe(AbstractMQTTSubscriptionStrategy.java:200)[activemq-mqtt-5.11.1.jar:5.11.1]
	at org.apache.activemq.transport.mqtt.strategy.MQTTDefaultSubscriptionStrategy.onSubscribe(MQTTDefaultSubscriptionStrategy.java:87)[activemq-mqtt-5.11.1.jar:5.11.1]
	at org.apache.activemq.transport.mqtt.strategy.AbstractMQTTSubscriptionStrategy.onSubscribe(AbstractMQTTSubscriptionStrategy.java:108)[activemq-mqtt-5.11.1.jar:5.11.1]
	at org.apache.activemq.transport.mqtt.MQTTProtocolConverter.onSubscribe(MQTTProtocolConverter.java:352)[activemq-mqtt-5.11.1.jar:5.11.1]
	at org.apache.activemq.transport.mqtt.MQTTProtocolConverter.onMQTTCommand(MQTTProtocolConverter.java:204)[activemq-mqtt-5.11.1.jar:5.11.1]
	at org.apache.activemq.transport.mqtt.MQTTTransportFilter.onCommand(MQTTTransportFilter.java:94)[activemq-mqtt-5.11.1.jar:5.11.1]
	at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)[activemq-client-5.11.1.jar:5.11.1]
	at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214)[activemq-client-5.11.1.jar:5.11.1]
	at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)[activemq-client-5.11.1.jar:5.11.1]
	at java.lang.Thread.run(Unknown Source)[:1.8.0_31]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)