You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Timothy Bish (JIRA)" <ji...@apache.org> on 2013/06/12 23:12:20 UTC

[jira] [Commented] (AMQ-4576) MQTT BlockingConnection.receive fails when subscribing multiple topics

    [ https://issues.apache.org/jira/browse/AMQ-4576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13681640#comment-13681640 ] 

Timothy Bish commented on AMQ-4576:
-----------------------------------

This will probably need a bit of thought.  The broker is doing what it was designed to do here, allowing only one durable subscriber with the same client Id and subscription name.  In the case of the MQTT code as it currently stands when the subscribe occurs the code uses the client Id as the subscription name as well. 
                
> MQTT BlockingConnection.receive fails when subscribing multiple topics
> ----------------------------------------------------------------------
>
>                 Key: AMQ-4576
>                 URL: https://issues.apache.org/jira/browse/AMQ-4576
>             Project: ActiveMQ
>          Issue Type: Bug
>    Affects Versions: 5.8.0
>            Reporter: Pedro Marques
>
> When more than one topic is supplied to BlockingConnection.subscribe the BlockingConnection.receive fails and the following exception is thrown:
> {code}
> java.io.IOException: Could not connect: CONNECTION_REFUSED_SERVER_UNAVAILABLE
> 	at org.fusesource.mqtt.client.CallbackConnection$LoginHandler$1.onTransportCommand(CallbackConnection.java:331)
> 	at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
> 	at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
> 	at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
> 	at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
> 	at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
> 	at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
> {code}
> On the server shows the following messages:
> {code}
> 2013-06-06 15:06:00,125 WARN  [org.apache.activemq.transport.mqtt.MQTTProtocolConverter] (ActiveMQ BrokerService[localhost] Task-1) Exception occurred processing: 
> null: javax.jms.JMSException: Durable consumer is in use for client: 6056@3232261834SOC and subscriptionName: 6056@3232261834SOC
> 2013-06-06 15:06:00,130 WARN  [org.apache.activemq.broker.TransportConnection] (ActiveMQ Transport: tcp:///127.0.0.1:53389@1883) Failed to add Connection ID:LTD-SFW004-53303-1370527418664-2:14, reason: javax.jms.InvalidClientIDException: Broker: localhost - Client: 6056@3232261834SOC already connected from tcp://127.0.0.1:53388
> 2013-06-06 15:06:00,130 WARN  [org.apache.activemq.broker.TransportConnection.Transport] (ActiveMQ Transport: tcp:///127.0.0.1:53389@1883) Transport Connection to: tcp://127.0.0.1:53389 failed: java.io.IOException: Broker: localhost - Client: 6056@3232261834SOC already connected from tcp://127.0.0.1:53388
> 2013-06-06 15:06:00,130 ERROR [pt.intellicare.onecare.mqtt.OneCareFuseMqttClient] (DefaultQuartzScheduler_Worker-8) Problem receiving mqtt messages: java.io.IOException: Could not connect: CONNECTION_REFUSED_SERVER_UNAVAILABLE
> 	at org.fusesource.mqtt.client.CallbackConnection$LoginHandler$1.onTransportCommand(CallbackConnection.java:331) [:1.5-SNAPSHOT]
> 	at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) [:1.17]
> 	at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) [:1.17]
> 	at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) [:1.17]
> 	at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) [:1.17]
> 	at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) [:1.17]
> {code}
> Code example:
> {code}
> MQTT = new MQTT();
> mqtt.setHost(url);
> mqtt.setClientId(clientId);
> mqtt.setUserName(user);
> mqtt.setPassword(password);
> mqtt.setCleanSession(false);
> BlockingConnection connection = mqtt.blockingConnection();
> connection.connect();
> Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE), new Topic("TopicB", QoS.EXACTLY_ONCE)};
> byte[] qoses = connection.subscribe(topics);
> while (true) {
>     Message message = connection.receive();
>     byte[] payload = message.getPayload();
>     String messageContent = new String(payload);
>     System.out.println("Received message from topic: " + message.getTopic() + " Message content: " + messageContent);
>     message.ack();
> }
> {code}
> The test failed when using the current fusesource client (1.5) on ActiveMQ 5.9, on Mosquitto mqtt the code works correctly.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira