You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Pedro Marques (JIRA)" <ji...@apache.org> on 2013/06/17 20:17:21 UTC
[jira] [Updated] (AMQ-4585) MQTT BlockingConnection.receive fails
when receiving pending messages after reconnect without cleaning session
[ https://issues.apache.org/jira/browse/AMQ-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Pedro Marques updated AMQ-4585:
-------------------------------
Description:
The system throws at least three different types of exceptions when a subscriber receives the first pending message without cleaning the session. The test case corresponds to receiving several messages from a publisher then closing the subscriber connection and finally reconnecting with setCleanSession(false) and attempt to read the messages published while the subscriber was disconnected.
The exceptions thrown:
{code}
java.net.ProtocolException: Command from server contained an invalid message id: 1
at org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723)
at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762)
at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
{code}
{code}
java.lang.ArrayIndexOutOfBoundsException: 0
at org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81)
at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40)
at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749)
at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
{code}
{code}
java.net.ProtocolException: Unexpected MQTT command type: 0
at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775)
at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
{code}
The problem doesn't occur always but most of the times the first reconnection attempt is made. With setCleanSession(true) the system works fine.
Code sample (publisher, permanently running):
{code}
MQTT mqtt = new MQTT();
mqtt.setHost(url);
mqtt.setUserName(user);
mqtt.setPassword(password);
mqtt.setClientId("test_id");
int i = 0;
while (true) {
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
String message = "TestMessage: " + i;
connection.publish("TopicA", message.getBytes(), QoS.AT_LEAST_ONCE, false);
System.out.println("Vendor: Sent message.");
Thread.sleep(2500);
connection.disconnect();
Thread.sleep(2500);
i++;
}
{code}
Code sample (subscriber, fails multiple times when restarting after the connection is closed):
{code}
try {
MQTT = new MQTT();
mqtt.setHost(url);
mqtt.setClientId(clientId);
mqtt.setUserName(user);
mqtt.setPassword(password);
mqtt.setCleanSession(false);
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE)};
byte[] qoses = connection.subscribe(topics);
int numMessages = 1;
while (numMessages % 10 != 0) {
Message message = connection.receive();
byte[] payload = message.getPayload();
String messageContent = new String(payload);
System.out.println("Received message from topic: " + message.getTopic() + " Message content: " + messageContent);
message.ack();
numMessages++;
}
} finally {
if(connection != null) {
try {
connection.disconnect();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
{code}
The test failed when using the current fusesource client (1.5) on ActiveMQ 5.9, on Mosquitto mqtt the code works correctly
was:
The system throws at least three different types of exceptions when a subscriber receives the first pending message without cleaning the session. The test case corresponds to receiving several messages from a publisher then closing the subscriber connection and finally reconnecting with setCleanSession(false) and attempt to read the messages published while the subscriber was disconnected.
The exceptions thrown:
{code}
java.net.ProtocolException: Command from server contained an invalid message id: 1
at org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723)
at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762)
at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
{code}
{code}
java.lang.ArrayIndexOutOfBoundsException: 0
at org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81)
at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40)
at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749)
at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
{code}
{code}
java.net.ProtocolException: Unexpected MQTT command type: 0
at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775)
at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
{code}
The problem doesn't occur always but most of the times the first reconnection attempt is made. With setCleanSession(true) the system works fine.
Code sample (publisher, permanently running):
{code}
MQTT mqtt = new MQTT();
mqtt.setHost(url);
mqtt.setUserName(user);
mqtt.setPassword(password);
mqtt.setClientId("test_id");
int i = 0;
while (true) {
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
String message = "TestMessage: " + i;
connection.publish("TopicA", message.getBytes(), QoS.AT_LEAST_ONCE, false);
System.out.println("Vendor: Sent message.");
Thread.sleep(2500);
connection.disconnect();
Thread.sleep(2500);
i++;
}
{code}
Code sample (subscriber, fails multiple times when restarting after the connection is closed):
{code}
try {
MQTT = new MQTT();
mqtt.setHost(url);
mqtt.setClientId(clientId);
mqtt.setUserName(user);
mqtt.setPassword(password);
mqtt.setCleanSession(false);
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE)};
byte[] qoses = connection.subscribe(topics);
int numMessages = 1;
while (numMessages % 10 != 0) {
Message message = connection.receive();
byte[] payload = message.getPayload();
String messageContent = new String(payload);
System.out.println("Received message from topic: " + message.getTopic() + " Message content: " + messageContent);
message.ack();
numMessages++;
}
} finally {
if(connection != null) {
try {
connection.disconnect();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
{code}
> MQTT BlockingConnection.receive fails when receiving pending messages after reconnect without cleaning session
> --------------------------------------------------------------------------------------------------------------
>
> Key: AMQ-4585
> URL: https://issues.apache.org/jira/browse/AMQ-4585
> Project: ActiveMQ
> Issue Type: Bug
> Affects Versions: 5.8.0
> Reporter: Pedro Marques
>
> The system throws at least three different types of exceptions when a subscriber receives the first pending message without cleaning the session. The test case corresponds to receiving several messages from a publisher then closing the subscriber connection and finally reconnecting with setCleanSession(false) and attempt to read the messages published while the subscriber was disconnected.
> The exceptions thrown:
> {code}
> java.net.ProtocolException: Command from server contained an invalid message id: 1
> at org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723)
> at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762)
> at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
> at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
> at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
> at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
> at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
> at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
> at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
> at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
> {code}
> {code}
> java.lang.ArrayIndexOutOfBoundsException: 0
> at org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81)
> at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40)
> at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749)
> at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
> at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
> at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
> at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
> at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
> at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
> at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
> at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
> {code}
> {code}
> java.net.ProtocolException: Unexpected MQTT command type: 0
> at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775)
> at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
> at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
> at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
> at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
> at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
> at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
> at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
> at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
> {code}
> The problem doesn't occur always but most of the times the first reconnection attempt is made. With setCleanSession(true) the system works fine.
> Code sample (publisher, permanently running):
> {code}
> MQTT mqtt = new MQTT();
> mqtt.setHost(url);
> mqtt.setUserName(user);
> mqtt.setPassword(password);
> mqtt.setClientId("test_id");
> int i = 0;
> while (true) {
> BlockingConnection connection = mqtt.blockingConnection();
> connection.connect();
> String message = "TestMessage: " + i;
> connection.publish("TopicA", message.getBytes(), QoS.AT_LEAST_ONCE, false);
> System.out.println("Vendor: Sent message.");
> Thread.sleep(2500);
> connection.disconnect();
> Thread.sleep(2500);
> i++;
> }
> {code}
> Code sample (subscriber, fails multiple times when restarting after the connection is closed):
> {code}
> try {
> MQTT = new MQTT();
> mqtt.setHost(url);
> mqtt.setClientId(clientId);
> mqtt.setUserName(user);
> mqtt.setPassword(password);
> mqtt.setCleanSession(false);
> BlockingConnection connection = mqtt.blockingConnection();
> connection.connect();
> Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE)};
> byte[] qoses = connection.subscribe(topics);
> int numMessages = 1;
> while (numMessages % 10 != 0) {
> Message message = connection.receive();
> byte[] payload = message.getPayload();
> String messageContent = new String(payload);
> System.out.println("Received message from topic: " + message.getTopic() + " Message content: " + messageContent);
> message.ack();
> numMessages++;
> }
> } finally {
> if(connection != null) {
> try {
> connection.disconnect();
> } catch (Exception e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> }
> {code}
> The test failed when using the current fusesource client (1.5) on ActiveMQ 5.9, on Mosquitto mqtt the code works correctly
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira