You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ce...@apache.org on 2012/10/24 00:52:06 UTC

svn commit: r1401509 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/mqtt/ test/java/org/apache/activemq/transport/mqtt/

Author: ceposta
Date: Tue Oct 23 22:52:06 2012
New Revision: 1401509

URL: http://svn.apache.org/viewvc?rev=1401509&view=rev
Log:
AMQ-4123 Improve MQTT Inactivity Monitoring

* Fixed broken MQTTTests
* Added more tests around Inactivity Monitoring / MQTT keep alive
* Removed the KeepAliveInfo check from the MQTTInactivityMonitor, clarified the PINGREQ/RESP frames in the inactivity check
* Implemented a grace period for the keep alive on the server side per MQTT spec
* Fixed clientId assignment
* Added "default" keep alive for server-side control of lingering MQTT connections

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java?rev=1401509&r1=1401508&r2=1401509&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java Tue Oct 23 22:52:06 2012
@@ -109,6 +109,11 @@ public class MQTTInactivityMonitor exten
     final void readCheck() {
         int currentCounter = next.getReceiveCounter();
         int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
+
+        // for the PINGREQ/RESP frames, the currentCounter will be different from previousCounter, and that
+        // should be sufficient to indicate the connection is still alive. If there were random data, or something
+        // outside the scope of the spec, the wire format unrmarshalling would fail, so we don't need to handle
+        // PINGREQ/RESP explicitly here
         if (inReceive.get() || currentCounter != previousCounter) {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("A receive is in progress");
@@ -139,22 +144,7 @@ public class MQTTInactivityMonitor exten
         commandReceived.set(true);
         inReceive.set(true);
         try {
-            if (command.getClass() == KeepAliveInfo.class) {
-                KeepAliveInfo info = (KeepAliveInfo) command;
-                if (info.isResponseRequired()) {
-                    sendLock.lock();
-                    try {
-                        info.setResponseRequired(false);
-                        oneway(info);
-                    } catch (IOException e) {
-                        onException(e);
-                    } finally {
-                        sendLock.unlock();
-                    }
-                }
-            } else {
-                transportListener.onCommand(command);
-            }
+            transportListener.onCommand(command);
         } finally {
             inReceive.set(false);
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java?rev=1401509&r1=1401508&r2=1401509&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java Tue Oct 23 22:52:06 2012
@@ -84,6 +84,7 @@ class MQTTProtocolConverter {
 
     private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
     private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
+    private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD= 1.5;
 
     private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
     private final SessionId sessionId = new SessionId(connectionId, -1);
@@ -106,10 +107,12 @@ class MQTTProtocolConverter {
     private ConnectionInfo connectionInfo = new ConnectionInfo();
     private CONNECT connect;
     private String clientId;
+    private long defaultKeepAlive;
     private final String QOS_PROPERTY_NAME = "QoSPropertyName";
 
     public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerContext brokerContext) {
         this.mqttTransport = mqttTransport;
+        this.defaultKeepAlive = 0;
     }
 
     int generateCommandId() {
@@ -142,6 +145,7 @@ class MQTTProtocolConverter {
 
         switch (frame.messageType()) {
             case PINGREQ.TYPE: {
+                LOG.debug("Received a ping from client: " + getClientId());
                 mqttTransport.sendToMQTT(PING_RESP_FRAME);
                 LOG.debug("Sent Ping Response to " + getClientId());
                 break;
@@ -538,19 +542,49 @@ class MQTTProtocolConverter {
         }
     }
 
-    void configureInactivityMonitor(short heartBeat) {
+    void configureInactivityMonitor(short keepAliveSeconds) {
+        MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor();
+
+        // If the user specifically shuts off the InactivityMonitor with transport.useInactivityMonitor=false,
+        // then ignore configuring it because it won't exist
+        if (monitor == null) {
+            return;
+        }
+
+
+        long keepAliveMS = keepAliveSeconds * 1000;
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("MQTT Client " + getClientId() + " requests heart beat of  " + keepAliveMS + " ms");
+        }
+
         try {
-            int heartBeatMS = heartBeat * 1000;
-            MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor();
+
+            long keepAliveMSWithGracePeriod = (long) (keepAliveMS * MQTT_KEEP_ALIVE_GRACE_PERIOD);
+
+            // if we have a default keep-alive value, and the client is trying to turn off keep-alive,
+            // we'll observe the server-side configured default value (note, no grace period)
+            if (keepAliveMSWithGracePeriod == 0 && defaultKeepAlive > 0) {
+                keepAliveMSWithGracePeriod = defaultKeepAlive;
+            }
+
             monitor.setProtocolConverter(this);
-            monitor.setReadCheckTime(heartBeatMS);
-            monitor.setInitialDelayTime(heartBeatMS);
+            monitor.setReadCheckTime(keepAliveMSWithGracePeriod);
+            monitor.setInitialDelayTime(keepAliveMS);
             monitor.startMonitorThread();
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("MQTT Client " + getClientId() +
+                        " established heart beat of  " + keepAliveMSWithGracePeriod +
+                        " ms (" + keepAliveMS + "ms + " + (keepAliveMSWithGracePeriod - keepAliveMS) +
+                        "ms grace period)");
+            }
         } catch (Exception ex) {
             LOG.warn("Failed to start MQTT InactivityMonitor ", ex);
         }
 
-        LOG.debug(getClientId() + " MQTT Connection using heart beat of  " + heartBeat + " secs");
+
+
     }
 
     void handleException(Throwable exception, MQTTFrame command) {
@@ -577,8 +611,9 @@ class MQTTProtocolConverter {
             if (connect != null && connect.clientId() != null) {
                 clientId = connect.clientId().toString();
             }
-        } else {
-            clientId = "";
+            else {
+                clientId = "";
+            }
         }
         return clientId;
     }
@@ -635,4 +670,18 @@ class MQTTProtocolConverter {
         result = result.replace('/', '.');
         return result;
     }
+
+    public long getDefaultKeepAlive() {
+        return defaultKeepAlive;
+    }
+
+    /**
+     * Set the default keep alive time (in milliseconds) that would be used if configured on server side
+     * and the client sends a keep-alive value of 0 (zero) on a CONNECT frame
+     *
+     * @param defaultKeepAlive
+     */
+    public void setDefaultKeepAlive(long defaultKeepAlive) {
+        this.defaultKeepAlive = defaultKeepAlive;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java?rev=1401509&r1=1401508&r2=1401509&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java Tue Oct 23 22:52:06 2012
@@ -134,5 +134,13 @@ public class MQTTTransportFilter extends
         super.onException(e);
     }
 
+    public long getDefaultKeepAlive() {
+        return protocolConverter != null ? protocolConverter.getDefaultKeepAlive() : -1;
+    }
+
+    public void setDefaultKeepAlive(long defaultHeartBeat) {
+        protocolConverter.setDefaultKeepAlive(defaultHeartBeat);
+    }
+
 
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java?rev=1401509&r1=1401508&r2=1401509&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java Tue Oct 23 22:52:06 2012
@@ -19,10 +19,9 @@ package org.apache.activemq.transport.mq
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.Vector;
+import java.util.LinkedList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -34,6 +33,7 @@ import org.apache.activemq.ActiveMQConne
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.AutoFailTestSupport;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
@@ -58,7 +58,7 @@ import static org.junit.Assert.*;
 public class MQTTTest {
     protected static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
     protected BrokerService brokerService;
-    protected Vector<Throwable> exceptions = new Vector<Throwable>();
+    protected LinkedList<Throwable> exceptions = new LinkedList<Throwable>();
     protected int numberOfMessages;
     AutoFailTestSupport autoFailTestSupport = new AutoFailTestSupport() {};
 
@@ -120,6 +120,8 @@ public class MQTTTest {
 
         latch.await(10, TimeUnit.SECONDS);
         assertEquals(0, latch.getCount());
+        subscribeConnection.disconnect();
+        publisherConnection.disconnect();
     }
 
     @Test
@@ -136,7 +138,7 @@ public class MQTTTest {
         connection.subscribe(topics);
         for (int i = 0; i < numberOfMessages; i++) {
             String payload = "Test Message: " + i;
-            connection.publish("foo2", payload.getBytes(), QoS.AT_MOST_ONCE, false);
+            connection.publish("foo", payload.getBytes(), QoS.AT_MOST_ONCE, false);
             Message message = connection.receive(5, TimeUnit.SECONDS);
             assertNotNull("Should get a message", message);
             assertEquals(payload, new String(message.getPayload()));
@@ -294,25 +296,30 @@ public class MQTTTest {
 
         addMQTTConnector(brokerService);
         brokerService.start();
+        TransportConnector mqttConnector = brokerService.getTransportConnectorByScheme("mqtt");
 
         // manually need to create the client so we don't send keep alive (PINGREQ) frames to keep the conn
         // from timing out
-        final AtomicLong exceptionCount = new AtomicLong(0);
-        Transport clientTransport = createManualMQTTClient(exceptionCount);
+        Transport clientTransport = createManualMQTTClient();
         clientTransport.start();
         CONNECT connectFrame = new CONNECT().clientId(new UTF8Buffer("testClient")).keepAlive((short)2);
         clientTransport.oneway(connectFrame.encode());
 
+        // wait for broker to register the MQTT connection
+        TimeUnit.SECONDS.sleep(1);
+        assertTrue(mqttConnector.getConnections().size() > 0);
 
-
+        // wait for the inactivity monitor to remove the connection due to inactivity
         TimeUnit.SECONDS.sleep(10);
-        System.out.println("Done waiting");
-        assertEquals("We have elapsed the keep alive, we should have disconnected", 1, exceptionCount.get());
+        assertTrue(mqttConnector.getConnections().size() == 0);
+        assertTrue("Should have seen client transport exception", exceptions.size() > 0);
+
+        clientTransport.stop();
 
     }
 
 
-    private Transport createManualMQTTClient(final AtomicLong exceptionCount) throws IOException, URISyntaxException {
+    private Transport createManualMQTTClient() throws IOException, URISyntaxException {
         Transport clientTransport = new TcpTransport(new MQTTWireFormat(), SocketFactory.getDefault(),
                 new URI("tcp://localhost:1883"), null);
         clientTransport.setTransportListener(new TransportListener() {
@@ -322,8 +329,7 @@ public class MQTTTest {
 
             @Override
             public void onException(IOException error) {
-                System.out.println("Exception!!!" + error.getMessage());
-                exceptionCount.incrementAndGet();
+                exceptions.add(error);
             }
 
             @Override
@@ -353,14 +359,70 @@ public class MQTTTest {
         connection.disconnect();
     }
 
+    @Test
+    public void testTurnOffInactivityMonitor()throws Exception{
+        addMQTTConnector(brokerService, "?transport.useInactivityMonitor=false");
+        brokerService.start();
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setKeepAlive((short)2);
+        final BlockingConnection connection = mqtt.blockingConnection();
+        connection.connect();
+
+        TimeUnit.SECONDS.sleep(10);
+
+
+        assertTrue("KeepAlive didn't work properly", connection.isConnected());
+
+        connection.disconnect();
+    }
+
+    @Test
+    public void testPingOnMQTTNIO() throws Exception {
+        brokerService.addConnector("mqtt+nio://localhost:1883");
+        brokerService.start();
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setKeepAlive((short)2);
+        final BlockingConnection connection = mqtt.blockingConnection();
+        connection.connect();
+
+        TimeUnit.SECONDS.sleep(10);
+
+        assertTrue("KeepAlive didn't work properly", connection.isConnected());
+
+        connection.disconnect();
+    }
+
+    @Test
+    public void testDefaultKeepAliveWhenClientSpecifiesZero() throws Exception {
+        // default keep alive in milliseconds
+        brokerService.addConnector("mqtt://localhost:1883?transport.defaultKeepAlive=2000");
+        brokerService.start();
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setKeepAlive((short)0);
+        final BlockingConnection connection = mqtt.blockingConnection();
+        connection.connect();
+
+        TimeUnit.SECONDS.sleep(10);
+
+        assertFalse("KeepAlive didn't work properly", connection.isConnected());
+
+    }
+
 
     protected void addMQTTConnector(BrokerService brokerService) throws Exception {
         brokerService.addConnector("mqtt://localhost:1883");
     }
 
+    protected void addMQTTConnector(BrokerService brokerService, String config) throws Exception {
+        brokerService.addConnector("mqtt://localhost:1883" + config);
+    }
+
     protected MQTT createMQTTConnection() throws Exception {
         MQTT mqtt = new MQTT();
         mqtt.setHost("localhost", 1883);
+        // shut off connect retry
+        mqtt.setConnectAttemptsMax(0);
+        mqtt.setReconnectAttemptsMax(0);
         return mqtt;
     }