You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ce...@apache.org on 2012/10/23 00:01:08 UTC

svn commit: r1401102 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java

Author: ceposta
Date: Mon Oct 22 22:01:08 2012
New Revision: 1401102

URL: http://svn.apache.org/viewvc?rev=1401102&view=rev
Log:
Tests for https://issues.apache.org/jira/browse/AMQ-4123

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java?rev=1401102&r1=1401101&r2=1401102&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java Mon Oct 22 22:01:08 2012
@@ -16,34 +16,43 @@
  */
 package org.apache.activemq.transport.mqtt;
 
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Vector;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.net.SocketFactory;
+
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.AutoFailTestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.tcp.TcpTransport;
 import org.apache.activemq.util.ByteSequence;
+import org.fusesource.hawtbuf.UTF8Buffer;
 import org.fusesource.mqtt.client.BlockingConnection;
 import org.fusesource.mqtt.client.MQTT;
 import org.fusesource.mqtt.client.Message;
 import org.fusesource.mqtt.client.QoS;
 import org.fusesource.mqtt.client.Topic;
+import org.fusesource.mqtt.codec.CONNECT;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.*;
 
 
 public class MQTTTest {
@@ -280,6 +289,70 @@ public class MQTTTest {
         connection.disconnect();
     }
 
+    @Test
+    public void testInactivityTimeoutDisconnectsClient() throws Exception{
+
+        addMQTTConnector(brokerService);
+        brokerService.start();
+
+        // manually need to create the client so we don't send keep alive (PINGREQ) frames to keep the conn
+        // from timing out
+        final AtomicLong exceptionCount = new AtomicLong(0);
+        Transport clientTransport = createManualMQTTClient(exceptionCount);
+        clientTransport.start();
+        CONNECT connectFrame = new CONNECT().clientId(new UTF8Buffer("testClient")).keepAlive((short)2);
+        clientTransport.oneway(connectFrame.encode());
+
+
+
+        TimeUnit.SECONDS.sleep(10);
+        System.out.println("Done waiting");
+        assertEquals("We have elapsed the keep alive, we should have disconnected", 1, exceptionCount.get());
+
+    }
+
+
+    private Transport createManualMQTTClient(final AtomicLong exceptionCount) throws IOException, URISyntaxException {
+        Transport clientTransport = new TcpTransport(new MQTTWireFormat(), SocketFactory.getDefault(),
+                new URI("tcp://localhost:1883"), null);
+        clientTransport.setTransportListener(new TransportListener() {
+            @Override
+            public void onCommand(Object command) {
+            }
+
+            @Override
+            public void onException(IOException error) {
+                System.out.println("Exception!!!" + error.getMessage());
+                exceptionCount.incrementAndGet();
+            }
+
+            @Override
+            public void transportInterupted() {
+            }
+
+            @Override
+            public void transportResumed() {
+            }
+        });
+        return clientTransport;
+    }
+
+    @Test
+    public void testPingKeepsInactivityMonitorAlive() throws Exception {
+        addMQTTConnector(brokerService);
+        brokerService.start();
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setKeepAlive((short)2);
+        final BlockingConnection connection = mqtt.blockingConnection();
+        connection.connect();
+
+        TimeUnit.SECONDS.sleep(10);
+
+        assertTrue("KeepAlive didn't work properly", connection.isConnected());
+
+        connection.disconnect();
+    }
+
 
     protected void addMQTTConnector(BrokerService brokerService) throws Exception {
         brokerService.addConnector("mqtt://localhost:1883");