You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/07/06 16:01:44 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6345

Repository: activemq
Updated Branches:
  refs/heads/master c02bc6484 -> 6dacef1c9


https://issues.apache.org/jira/browse/AMQ-6345

The MQTT transport will now throw an exception if a PINGREQ is sent to
the broker if a CONNECT packet has not been received first as the spec
says CONNECT must be the first packet sent.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6dacef1c
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6dacef1c
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6dacef1c

Branch: refs/heads/master
Commit: 6dacef1c9552edbad656c31d784179c2cd179b2e
Parents: c02bc64
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Wed Jul 6 15:59:25 2016 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Wed Jul 6 15:59:25 2016 +0000

----------------------------------------------------------------------
 .../transport/mqtt/MQTTProtocolConverter.java   |   1 +
 .../activemq/transport/mqtt/MQTTTest.java       | 122 +++++++++++++++++++
 2 files changed, 123 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/6dacef1c/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index e693327..23ca5fa 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -196,6 +196,7 @@ public class MQTTProtocolConverter {
         switch (frame.messageType()) {
             case PINGREQ.TYPE:
                 LOG.debug("Received a ping from client: " + getClientId());
+                checkConnected();
                 sendToMQTT(PING_RESP_FRAME);
                 LOG.debug("Sent Ping Response to " + getClientId());
                 break;

http://git-wip-us.apache.org/repos/asf/activemq/blob/6dacef1c/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 5e28b2a..227ade6 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -25,6 +25,8 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.EOFException;
+import java.lang.reflect.Method;
 import java.net.ProtocolException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -56,13 +58,21 @@ import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
+import org.fusesource.hawtdispatch.transport.Transport;
 import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.Callback;
+import org.fusesource.mqtt.client.CallbackConnection;
+import org.fusesource.mqtt.client.FutureConnection;
 import org.fusesource.mqtt.client.MQTT;
 import org.fusesource.mqtt.client.Message;
 import org.fusesource.mqtt.client.QoS;
 import org.fusesource.mqtt.client.Topic;
 import org.fusesource.mqtt.client.Tracer;
 import org.fusesource.mqtt.codec.MQTTFrame;
+import org.fusesource.mqtt.codec.MQTTProtocolCodec;
+import org.fusesource.mqtt.codec.PINGREQ;
+import org.fusesource.mqtt.codec.PINGRESP;
 import org.fusesource.mqtt.codec.PUBLISH;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -1961,4 +1971,116 @@ public class MQTTTest extends MQTTTestSupport {
 
         connection.disconnect();
     }
+
+    @Test(timeout = 15 * 1000, expected=EOFException.class)
+    public void testPingReqWithoutConnectFail31() throws Exception {
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setClientId("clientId");
+        mqtt.setVersion("3.1");
+        testPingReqWithoutConnectFail(mqtt);
+    }
+
+    @Test(timeout = 15 * 1000, expected=EOFException.class)
+    public void testPingReqWithoutConnectFail311() throws Exception {
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setClientId("clientId");
+        mqtt.setVersion("3.1.1");
+        testPingReqWithoutConnectFail(mqtt);
+    }
+
+    @Test(timeout = 15 * 1000)
+    public void testPingReqConnectSuccess31() throws Exception {
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setClientId("clientId");
+        mqtt.setVersion("3.1");
+        testPingReqConnectSuccess(mqtt);
+    }
+
+    @Test(timeout = 15 * 1000)
+    public void testPingReqConnectSuccess311() throws Exception {
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setClientId("clientId");
+        mqtt.setVersion("3.1.1");
+        testPingReqConnectSuccess(mqtt);
+    }
+
+    private void testPingReqWithoutConnectFail(final  MQTT mqtt) throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicReference<Transport> transport = new AtomicReference<>();
+        final AtomicReference<Throwable> error = new AtomicReference<>();
+        final Callback<Transport> con = new Callback<Transport>() {
+
+            @Override
+            public void onSuccess(Transport value) {
+                transport.set(value);
+                latch.countDown();
+            }
+
+            @Override
+            public void onFailure(Throwable value) {
+                error.set(value);
+                latch.countDown();
+            }
+        };
+
+        //Connect to the transport by using the createTransport method with a custom callback
+        //This will ensure that we connect without sending a CONNECT packet for testing
+        //and that we won't receive automatically
+        CallbackConnection connection = new CallbackConnection(mqtt);
+        Method createTransportMethod = connection.getClass().getDeclaredMethod("createTransport", Callback.class);
+        createTransportMethod.setAccessible(true);
+        createTransportMethod.invoke(connection, con);
+        latch.await();
+
+        //Make sure no error on connect
+        if (error.get() != null) {
+            LOG.error(error.get().getMessage(), error.get());
+            fail(error.get().getMessage());
+        }
+
+        //Send a PINGREQ without a connect packet first
+        final MQTTProtocolCodec codec = new MQTTProtocolCodec();
+        codec.setTransport(transport.get());
+        transport.get().offer(new PINGREQ().encode());
+
+        //Protocol should throw an exception since we never sent a CONNECT
+        Wait.waitFor(new Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                //Wait for exception to be thrown
+                codec.read();
+                return false;
+            }
+        }, 5000, 100);
+
+    }
+
+    private void testPingReqConnectSuccess(final MQTT mqtt) throws Exception {
+        final CountDownLatch pingRespReceived = new CountDownLatch(1);
+        //Tracer to assert we received the response by waiting for it
+        mqtt.setTracer(new Tracer() {
+
+            @Override
+            public void onReceive(MQTTFrame frame) {
+                if (frame.messageType() == PINGRESP.TYPE) {
+                    pingRespReceived.countDown();
+                }
+            }
+
+        });
+        CallbackConnection callbackConnection = new CallbackConnection(mqtt);
+        BlockingConnection connection = new BlockingConnection(new FutureConnection(callbackConnection));
+        connection.connect();
+        Transport transport =  callbackConnection.transport();
+
+        //SEND a PINGREQ and wait for the response
+        final MQTTProtocolCodec codec = new MQTTProtocolCodec();
+        codec.setTransport(transport);
+        transport.offer(new PINGREQ().encode());
+
+        //Wait for the response
+        assertTrue(pingRespReceived.await(5, TimeUnit.SECONDS));
+    }
+
 }