You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/07/06 16:01:44 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6345
Repository: activemq
Updated Branches:
refs/heads/master c02bc6484 -> 6dacef1c9
https://issues.apache.org/jira/browse/AMQ-6345
The MQTT transport will now throw an exception if a PINGREQ is sent to
the broker if a CONNECT packet has not been received first as the spec
says CONNECT must be the first packet sent.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6dacef1c
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6dacef1c
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6dacef1c
Branch: refs/heads/master
Commit: 6dacef1c9552edbad656c31d784179c2cd179b2e
Parents: c02bc64
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Wed Jul 6 15:59:25 2016 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Wed Jul 6 15:59:25 2016 +0000
----------------------------------------------------------------------
.../transport/mqtt/MQTTProtocolConverter.java | 1 +
.../activemq/transport/mqtt/MQTTTest.java | 122 +++++++++++++++++++
2 files changed, 123 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/6dacef1c/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index e693327..23ca5fa 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -196,6 +196,7 @@ public class MQTTProtocolConverter {
switch (frame.messageType()) {
case PINGREQ.TYPE:
LOG.debug("Received a ping from client: " + getClientId());
+ checkConnected();
sendToMQTT(PING_RESP_FRAME);
LOG.debug("Sent Ping Response to " + getClientId());
break;
http://git-wip-us.apache.org/repos/asf/activemq/blob/6dacef1c/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 5e28b2a..227ade6 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -25,6 +25,8 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.io.EOFException;
+import java.lang.reflect.Method;
import java.net.ProtocolException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -56,13 +58,21 @@ import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
+import org.fusesource.hawtdispatch.transport.Transport;
import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.Callback;
+import org.fusesource.mqtt.client.CallbackConnection;
+import org.fusesource.mqtt.client.FutureConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.MQTTFrame;
+import org.fusesource.mqtt.codec.MQTTProtocolCodec;
+import org.fusesource.mqtt.codec.PINGREQ;
+import org.fusesource.mqtt.codec.PINGRESP;
import org.fusesource.mqtt.codec.PUBLISH;
import org.junit.Test;
import org.slf4j.Logger;
@@ -1961,4 +1971,116 @@ public class MQTTTest extends MQTTTestSupport {
connection.disconnect();
}
+
+ @Test(timeout = 15 * 1000, expected=EOFException.class)
+ public void testPingReqWithoutConnectFail31() throws Exception {
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setClientId("clientId");
+ mqtt.setVersion("3.1");
+ testPingReqWithoutConnectFail(mqtt);
+ }
+
+ @Test(timeout = 15 * 1000, expected=EOFException.class)
+ public void testPingReqWithoutConnectFail311() throws Exception {
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setClientId("clientId");
+ mqtt.setVersion("3.1.1");
+ testPingReqWithoutConnectFail(mqtt);
+ }
+
+ @Test(timeout = 15 * 1000)
+ public void testPingReqConnectSuccess31() throws Exception {
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setClientId("clientId");
+ mqtt.setVersion("3.1");
+ testPingReqConnectSuccess(mqtt);
+ }
+
+ @Test(timeout = 15 * 1000)
+ public void testPingReqConnectSuccess311() throws Exception {
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setClientId("clientId");
+ mqtt.setVersion("3.1.1");
+ testPingReqConnectSuccess(mqtt);
+ }
+
+ private void testPingReqWithoutConnectFail(final MQTT mqtt) throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicReference<Transport> transport = new AtomicReference<>();
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ final Callback<Transport> con = new Callback<Transport>() {
+
+ @Override
+ public void onSuccess(Transport value) {
+ transport.set(value);
+ latch.countDown();
+ }
+
+ @Override
+ public void onFailure(Throwable value) {
+ error.set(value);
+ latch.countDown();
+ }
+ };
+
+ //Connect to the transport by using the createTransport method with a custom callback
+ //This will ensure that we connect without sending a CONNECT packet for testing
+ //and that we won't receive automatically
+ CallbackConnection connection = new CallbackConnection(mqtt);
+ Method createTransportMethod = connection.getClass().getDeclaredMethod("createTransport", Callback.class);
+ createTransportMethod.setAccessible(true);
+ createTransportMethod.invoke(connection, con);
+ latch.await();
+
+ //Make sure no error on connect
+ if (error.get() != null) {
+ LOG.error(error.get().getMessage(), error.get());
+ fail(error.get().getMessage());
+ }
+
+ //Send a PINGREQ without a connect packet first
+ final MQTTProtocolCodec codec = new MQTTProtocolCodec();
+ codec.setTransport(transport.get());
+ transport.get().offer(new PINGREQ().encode());
+
+ //Protocol should throw an exception since we never sent a CONNECT
+ Wait.waitFor(new Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ //Wait for exception to be thrown
+ codec.read();
+ return false;
+ }
+ }, 5000, 100);
+
+ }
+
+ private void testPingReqConnectSuccess(final MQTT mqtt) throws Exception {
+ final CountDownLatch pingRespReceived = new CountDownLatch(1);
+ //Tracer to assert we received the response by waiting for it
+ mqtt.setTracer(new Tracer() {
+
+ @Override
+ public void onReceive(MQTTFrame frame) {
+ if (frame.messageType() == PINGRESP.TYPE) {
+ pingRespReceived.countDown();
+ }
+ }
+
+ });
+ CallbackConnection callbackConnection = new CallbackConnection(mqtt);
+ BlockingConnection connection = new BlockingConnection(new FutureConnection(callbackConnection));
+ connection.connect();
+ Transport transport = callbackConnection.transport();
+
+ //SEND a PINGREQ and wait for the response
+ final MQTTProtocolCodec codec = new MQTTProtocolCodec();
+ codec.setTransport(transport);
+ transport.offer(new PINGREQ().encode());
+
+ //Wait for the response
+ assertTrue(pingRespReceived.await(5, TimeUnit.SECONDS));
+ }
+
}