You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/07/05 18:15:02 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6343

Repository: activemq
Updated Branches:
  refs/heads/master 96494f74c -> bd442a338


https://issues.apache.org/jira/browse/AMQ-6343

On MQTT Websocket close, a LWT message will be properly sent if
configured and a disconnect packet was not received


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bd442a33
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bd442a33
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bd442a33

Branch: refs/heads/master
Commit: bd442a3388b0f127c8f8b9fad5e4888b77bb42c3
Parents: 96494f7
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Tue Jul 5 17:47:49 2016 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Tue Jul 5 18:13:45 2016 +0000

----------------------------------------------------------------------
 .../transport/ws/jetty9/MQTTSocket.java         |   9 +
 .../activemq/transport/ws/MQTTWSConnection.java |   8 +-
 .../transport/ws/MQTTWSTransportWillTest.java   | 251 +++++++++++++++++++
 .../transport/ws/WSTransportTestSupport.java    |   5 +-
 .../transport/mqtt/MQTTProtocolConverter.java   |   3 +-
 5 files changed, 269 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/bd442a33/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
index 53bad07..d8c248d 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.ws.jetty9;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.transport.ws.AbstractMQTTSocket;
 import org.apache.activemq.util.ByteSequence;
@@ -36,6 +37,7 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener
 
     private final int ORDERLY_CLOSE_TIMEOUT = 10;
     private Session session;
+    final AtomicBoolean receivedDisconnect = new AtomicBoolean();
 
     public MQTTSocket(String remoteAddress) {
         super(remoteAddress);
@@ -71,6 +73,9 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener
         try {
             receiveCounter += length;
             MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset, length));
+            if (frame.messageType() == DISCONNECT.TYPE) {
+                receivedDisconnect.set(true);
+            }
             getProtocolConverter().onMQTTCommand(frame);
         } catch (Exception e) {
             onException(IOExceptionSupport.create(e));
@@ -84,6 +89,10 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener
         try {
             if (protocolLock.tryLock() || protocolLock.tryLock(ORDERLY_CLOSE_TIMEOUT, TimeUnit.SECONDS)) {
                 LOG.debug("MQTT WebSocket closed: code[{}] message[{}]", arg0, arg1);
+                //Check if we received a disconnect packet before closing
+                if (!receivedDisconnect.get()) {
+                    getProtocolConverter().onTransportError();
+                }
                 getProtocolConverter().onMQTTCommand(new DISCONNECT().encode());
             }
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/bd442a33/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java
index c4e8c47..f14da23 100644
--- a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java
+++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java
@@ -85,14 +85,16 @@ public class MQTTWSConnection extends WebSocketAdapter implements WebSocketListe
     }
 
     public void connect(String clientId) throws Exception {
-        checkConnected();
-
         CONNECT command = new CONNECT();
-
         command.clientId(new UTF8Buffer(clientId));
         command.cleanSession(false);
         command.version(3);
         command.keepAlive((short) 0);
+        connect(command);
+    }
+
+    public void connect(CONNECT command) throws Exception {
+        checkConnected();
 
         ByteSequence payload = wireFormat.marshal(command.encode());
         connection.getRemote().sendBytes(ByteBuffer.wrap(payload.data));

http://git-wip-us.apache.org/repos/asf/activemq/blob/bd442a33/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSTransportWillTest.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSTransportWillTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSTransportWillTest.java
new file mode 100644
index 0000000..6d05ab8
--- /dev/null
+++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSTransportWillTest.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.ws;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.UTF8Buffer;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.fusesource.mqtt.codec.CONNECT;
+import org.fusesource.mqtt.codec.PUBACK;
+import org.fusesource.mqtt.codec.PUBLISH;
+import org.fusesource.mqtt.codec.SUBSCRIBE;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * This shows that last will and testament messages work with MQTT over WS.
+ * This test is modeled after org.apache.activemq.transport.mqtt.MQTTWillTest
+ */
+@RunWith(Parameterized.class)
+public class MQTTWSTransportWillTest extends WSTransportTestSupport {
+
+    protected WebSocketClient wsClient;
+    protected MQTTWSConnection wsMQTTConnection1;
+    protected MQTTWSConnection wsMQTTConnection2;
+    protected ClientUpgradeRequest request;
+
+    private String willTopic = "willTopic";
+    private String payload = "last will";
+    private boolean closeWithDisconnect;
+
+    //Test both with a proper disconnect and without
+    @Parameters(name="closeWithDisconnect={0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {true},
+                {false}
+            });
+    }
+
+    public MQTTWSTransportWillTest(boolean closeWithDisconnect) {
+        this.closeWithDisconnect = closeWithDisconnect;
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        //turn off advisory support
+        broker = createBroker(true, false);
+
+        wsClient = new WebSocketClient(new SslContextFactory(true));
+        wsClient.start();
+
+        request = new ClientUpgradeRequest();
+        request.setSubProtocols("mqttv3.1");
+
+        wsMQTTConnection1 = new MQTTWSConnection();
+        wsMQTTConnection2 = new MQTTWSConnection();
+
+        wsClient.connect(wsMQTTConnection1, wsConnectUri, request);
+        if (!wsMQTTConnection1.awaitConnection(30, TimeUnit.SECONDS)) {
+            throw new IOException("Could not connect to MQTT WS endpoint");
+        }
+
+        wsClient.connect(wsMQTTConnection2, wsConnectUri, request);
+        if (!wsMQTTConnection2.awaitConnection(30, TimeUnit.SECONDS)) {
+            throw new IOException("Could not connect to MQTT WS endpoint");
+        }
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        if (wsMQTTConnection1 != null) {
+            wsMQTTConnection1.close();
+            wsMQTTConnection1 = null;
+        }
+        if (wsMQTTConnection2 != null) {
+            wsMQTTConnection2.close();
+            wsMQTTConnection2 = null;
+        }
+        wsClient.stop();
+        wsClient = null;
+        super.tearDown();
+    }
+
+    @Test(timeout = 60000)
+    public void testWill() throws Exception {
+
+        //connect with will retain false
+        CONNECT command = getWillConnectCommand(false);
+
+        //connect both connections
+        wsMQTTConnection1.connect(command);
+        wsMQTTConnection2.connect();
+
+        //Subscribe to topics
+        SUBSCRIBE subscribe = new SUBSCRIBE();
+        subscribe.topics(new Topic[] {new Topic("#", QoS.EXACTLY_ONCE) });
+        wsMQTTConnection2.sendFrame(subscribe.encode());
+        wsMQTTConnection2.receive(5, TimeUnit.SECONDS);
+
+        //Test message send/receive
+        wsMQTTConnection1.sendFrame(getTestMessage((short) 125).encode());
+        assertMessageReceived(wsMQTTConnection2);
+
+        //close the first connection without sending a proper disconnect frame first
+        //if closeWithDisconnect is false
+        if (closeWithDisconnect) {
+            wsMQTTConnection1.disconnect();
+        }
+        wsMQTTConnection1.close();
+
+        //Make sure LWT message is not received
+        if (closeWithDisconnect) {
+            assertNull(wsMQTTConnection2.receive(5, TimeUnit.SECONDS));
+        //make sure LWT is received
+        } else {
+            assertWillTopicReceived(wsMQTTConnection2);
+        }
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testRetainWillMessage() throws Exception {
+
+        //create connection with will retain true
+        CONNECT command = getWillConnectCommand(true);
+
+        wsMQTTConnection1.connect(command);
+        wsMQTTConnection2.connect();
+
+        //set to at most once to test will retain
+        SUBSCRIBE subscribe = new SUBSCRIBE();
+        subscribe.topics(new Topic[] {new Topic("#", QoS.AT_MOST_ONCE) });
+        wsMQTTConnection2.sendFrame(subscribe.encode());
+        wsMQTTConnection2.receive(5, TimeUnit.SECONDS);
+
+        //Test message send/receive
+        PUBLISH pub = getTestMessage((short) 127);
+        wsMQTTConnection1.sendFrame(pub.encode());
+        assertMessageReceived(wsMQTTConnection2);
+        PUBACK ack = new PUBACK();
+        ack.messageId(pub.messageId());
+        wsMQTTConnection2.sendFrame(ack.encode());
+
+        //Properly close connection 2 and improperly close connection 1 for LWT test
+        wsMQTTConnection2.disconnect();
+        wsMQTTConnection2.close();
+        Thread.sleep(1000);
+        //close the first connection without sending a proper disconnect frame first
+        //if closeWithoutDisconnect is false
+        if (closeWithDisconnect) {
+            wsMQTTConnection1.disconnect();
+        }
+        wsMQTTConnection1.close();
+        Thread.sleep(1000);
+
+        //Do the reconnect of the websocket after close
+        wsMQTTConnection2 = new MQTTWSConnection();
+        wsClient.connect(wsMQTTConnection2, wsConnectUri, request);
+        if (!wsMQTTConnection2.awaitConnection(30, TimeUnit.SECONDS)) {
+            throw new IOException("Could not connect to MQTT WS endpoint");
+        }
+
+
+        //Make sure the will message is received on reconnect
+        wsMQTTConnection2.connect();
+        wsMQTTConnection2.sendFrame(subscribe.encode());
+        wsMQTTConnection2.receive(5, TimeUnit.SECONDS);
+
+        //Make sure LWT message not received
+        if (closeWithDisconnect) {
+            assertNull(wsMQTTConnection2.receive(5, TimeUnit.SECONDS));
+        //make sure LWT is received
+        } else {
+            assertWillTopicReceived(wsMQTTConnection2);
+        }
+    }
+
+    private PUBLISH getTestMessage(short id) {
+        PUBLISH publish = new PUBLISH();
+        publish.dup(false);
+        publish.messageId(id);
+        publish.qos(QoS.AT_LEAST_ONCE);
+        publish.payload(new Buffer("hello world".getBytes()));
+        publish.topicName(new UTF8Buffer("test"));
+
+        return publish;
+    }
+
+    private CONNECT getWillConnectCommand(boolean willRetain) {
+        CONNECT command = new CONNECT();
+        command.clientId(new UTF8Buffer("clientId"));
+        command.cleanSession(false);
+        command.version(3);
+        command.keepAlive((short) 0);
+        command.willMessage(new UTF8Buffer(payload));
+        command.willQos(QoS.AT_LEAST_ONCE);
+        command.willTopic(new UTF8Buffer(willTopic));
+        command.willRetain(willRetain);
+
+        return command;
+    }
+
+    private void assertMessageReceived(MQTTWSConnection wsMQTTConnection2) throws Exception {
+        PUBLISH msg = new PUBLISH();
+        msg.decode(wsMQTTConnection2.receive(5, TimeUnit.SECONDS));
+        assertNotNull(msg);
+        assertEquals("hello world", msg.payload().ascii().toString());
+        assertEquals("test", msg.topicName().toString());
+    }
+
+    private void assertWillTopicReceived(MQTTWSConnection wsMQTTConnection2) throws Exception {
+        PUBLISH willMsg = new PUBLISH();
+        willMsg.decode(wsMQTTConnection2.receive(5, TimeUnit.SECONDS));
+        assertNotNull(willMsg);
+        assertEquals(payload, willMsg.payload().ascii().toString());
+        assertEquals(willTopic,  willMsg.topicName().toString());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/bd442a33/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java
index c83f24d..fba6e8a 100644
--- a/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java
+++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java
@@ -55,7 +55,7 @@ public class WSTransportTestSupport {
     @Before
     public void setUp() throws Exception {
         LOG.info("========== Starting test: {} ==========", name.getMethodName());
-        broker = createBroker(true);
+        broker = createBroker(true, true);
     }
 
     @After
@@ -89,7 +89,7 @@ public class WSTransportTestSupport {
 
     }
 
-    protected BrokerService createBroker(boolean deleteMessages) throws Exception {
+    protected BrokerService createBroker(boolean deleteMessages, boolean advisorySupport) throws Exception {
 
         BrokerService broker = new BrokerService();
 
@@ -103,6 +103,7 @@ public class WSTransportTestSupport {
 
         wsConnectUri = broker.addConnector(getWSConnectorURI()).getPublishableConnectURI();
 
+        broker.setAdvisorySupport(advisorySupport);
         broker.setUseJmx(true);
         broker.getManagementContext().setCreateConnector(false);
         broker.setPersistent(isPersistent());

http://git-wip-us.apache.org/repos/asf/activemq/blob/bd442a33/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 75ec9b8..e693327 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -366,8 +366,7 @@ public class MQTTProtocolConverter {
     }
 
     void onMQTTDisconnect() throws MQTTProtocolException {
-        if (connected.get()) {
-            connected.set(false);
+        if (connected.compareAndSet(true, false)) {
             sendToActiveMQ(connectionInfo.createRemoveCommand(), null);
             sendToActiveMQ(new ShutdownInfo(), null);
         }