You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ce...@apache.org on 2012/10/24 00:52:06 UTC
svn commit: r1401509 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/transport/mqtt/
test/java/org/apache/activemq/transport/mqtt/
Author: ceposta
Date: Tue Oct 23 22:52:06 2012
New Revision: 1401509
URL: http://svn.apache.org/viewvc?rev=1401509&view=rev
Log:
AMQ-4123 Improve MQTT Inactivity Monitoring
* Fixed broken MQTTTests
* Added more tests around Inactivity Monitoring / MQTT keep alive
* Removed the KeepAliveInfo check from the MQTTInactivityMonitor, clarified the PINGREQ/RESP frames in the inactivity check
* Implemented a grace period for the keep alive on the server side per MQTT spec
* Fixed clientId assignment
* Added "default" keep alive for server-side control of lingering MQTT connections
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java?rev=1401509&r1=1401508&r2=1401509&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java Tue Oct 23 22:52:06 2012
@@ -109,6 +109,11 @@ public class MQTTInactivityMonitor exten
final void readCheck() {
int currentCounter = next.getReceiveCounter();
int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
+
+ // for the PINGREQ/RESP frames, the currentCounter will be different from previousCounter, and that
+ // should be sufficient to indicate the connection is still alive. If there were random data, or something
+ // outside the scope of the spec, the wire format unrmarshalling would fail, so we don't need to handle
+ // PINGREQ/RESP explicitly here
if (inReceive.get() || currentCounter != previousCounter) {
if (LOG.isTraceEnabled()) {
LOG.trace("A receive is in progress");
@@ -139,22 +144,7 @@ public class MQTTInactivityMonitor exten
commandReceived.set(true);
inReceive.set(true);
try {
- if (command.getClass() == KeepAliveInfo.class) {
- KeepAliveInfo info = (KeepAliveInfo) command;
- if (info.isResponseRequired()) {
- sendLock.lock();
- try {
- info.setResponseRequired(false);
- oneway(info);
- } catch (IOException e) {
- onException(e);
- } finally {
- sendLock.unlock();
- }
- }
- } else {
- transportListener.onCommand(command);
- }
+ transportListener.onCommand(command);
} finally {
inReceive.set(false);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java?rev=1401509&r1=1401508&r2=1401509&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java Tue Oct 23 22:52:06 2012
@@ -84,6 +84,7 @@ class MQTTProtocolConverter {
private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
+ private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD= 1.5;
private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
private final SessionId sessionId = new SessionId(connectionId, -1);
@@ -106,10 +107,12 @@ class MQTTProtocolConverter {
private ConnectionInfo connectionInfo = new ConnectionInfo();
private CONNECT connect;
private String clientId;
+ private long defaultKeepAlive;
private final String QOS_PROPERTY_NAME = "QoSPropertyName";
public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerContext brokerContext) {
this.mqttTransport = mqttTransport;
+ this.defaultKeepAlive = 0;
}
int generateCommandId() {
@@ -142,6 +145,7 @@ class MQTTProtocolConverter {
switch (frame.messageType()) {
case PINGREQ.TYPE: {
+ LOG.debug("Received a ping from client: " + getClientId());
mqttTransport.sendToMQTT(PING_RESP_FRAME);
LOG.debug("Sent Ping Response to " + getClientId());
break;
@@ -538,19 +542,49 @@ class MQTTProtocolConverter {
}
}
- void configureInactivityMonitor(short heartBeat) {
+ void configureInactivityMonitor(short keepAliveSeconds) {
+ MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor();
+
+ // If the user specifically shuts off the InactivityMonitor with transport.useInactivityMonitor=false,
+ // then ignore configuring it because it won't exist
+ if (monitor == null) {
+ return;
+ }
+
+
+ long keepAliveMS = keepAliveSeconds * 1000;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("MQTT Client " + getClientId() + " requests heart beat of " + keepAliveMS + " ms");
+ }
+
try {
- int heartBeatMS = heartBeat * 1000;
- MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor();
+
+ long keepAliveMSWithGracePeriod = (long) (keepAliveMS * MQTT_KEEP_ALIVE_GRACE_PERIOD);
+
+ // if we have a default keep-alive value, and the client is trying to turn off keep-alive,
+ // we'll observe the server-side configured default value (note, no grace period)
+ if (keepAliveMSWithGracePeriod == 0 && defaultKeepAlive > 0) {
+ keepAliveMSWithGracePeriod = defaultKeepAlive;
+ }
+
monitor.setProtocolConverter(this);
- monitor.setReadCheckTime(heartBeatMS);
- monitor.setInitialDelayTime(heartBeatMS);
+ monitor.setReadCheckTime(keepAliveMSWithGracePeriod);
+ monitor.setInitialDelayTime(keepAliveMS);
monitor.startMonitorThread();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("MQTT Client " + getClientId() +
+ " established heart beat of " + keepAliveMSWithGracePeriod +
+ " ms (" + keepAliveMS + "ms + " + (keepAliveMSWithGracePeriod - keepAliveMS) +
+ "ms grace period)");
+ }
} catch (Exception ex) {
LOG.warn("Failed to start MQTT InactivityMonitor ", ex);
}
- LOG.debug(getClientId() + " MQTT Connection using heart beat of " + heartBeat + " secs");
+
+
}
void handleException(Throwable exception, MQTTFrame command) {
@@ -577,8 +611,9 @@ class MQTTProtocolConverter {
if (connect != null && connect.clientId() != null) {
clientId = connect.clientId().toString();
}
- } else {
- clientId = "";
+ else {
+ clientId = "";
+ }
}
return clientId;
}
@@ -635,4 +670,18 @@ class MQTTProtocolConverter {
result = result.replace('/', '.');
return result;
}
+
+ public long getDefaultKeepAlive() {
+ return defaultKeepAlive;
+ }
+
+ /**
+ * Set the default keep alive time (in milliseconds) that would be used if configured on server side
+ * and the client sends a keep-alive value of 0 (zero) on a CONNECT frame
+ *
+ * @param defaultKeepAlive
+ */
+ public void setDefaultKeepAlive(long defaultKeepAlive) {
+ this.defaultKeepAlive = defaultKeepAlive;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java?rev=1401509&r1=1401508&r2=1401509&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java Tue Oct 23 22:52:06 2012
@@ -134,5 +134,13 @@ public class MQTTTransportFilter extends
super.onException(e);
}
+ public long getDefaultKeepAlive() {
+ return protocolConverter != null ? protocolConverter.getDefaultKeepAlive() : -1;
+ }
+
+ public void setDefaultKeepAlive(long defaultHeartBeat) {
+ protocolConverter.setDefaultKeepAlive(defaultHeartBeat);
+ }
+
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java?rev=1401509&r1=1401508&r2=1401509&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java Tue Oct 23 22:52:06 2012
@@ -19,10 +19,9 @@ package org.apache.activemq.transport.mq
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.Vector;
+import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@@ -34,6 +33,7 @@ import org.apache.activemq.ActiveMQConne
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
@@ -58,7 +58,7 @@ import static org.junit.Assert.*;
public class MQTTTest {
protected static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
protected BrokerService brokerService;
- protected Vector<Throwable> exceptions = new Vector<Throwable>();
+ protected LinkedList<Throwable> exceptions = new LinkedList<Throwable>();
protected int numberOfMessages;
AutoFailTestSupport autoFailTestSupport = new AutoFailTestSupport() {};
@@ -120,6 +120,8 @@ public class MQTTTest {
latch.await(10, TimeUnit.SECONDS);
assertEquals(0, latch.getCount());
+ subscribeConnection.disconnect();
+ publisherConnection.disconnect();
}
@Test
@@ -136,7 +138,7 @@ public class MQTTTest {
connection.subscribe(topics);
for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i;
- connection.publish("foo2", payload.getBytes(), QoS.AT_MOST_ONCE, false);
+ connection.publish("foo", payload.getBytes(), QoS.AT_MOST_ONCE, false);
Message message = connection.receive(5, TimeUnit.SECONDS);
assertNotNull("Should get a message", message);
assertEquals(payload, new String(message.getPayload()));
@@ -294,25 +296,30 @@ public class MQTTTest {
addMQTTConnector(brokerService);
brokerService.start();
+ TransportConnector mqttConnector = brokerService.getTransportConnectorByScheme("mqtt");
// manually need to create the client so we don't send keep alive (PINGREQ) frames to keep the conn
// from timing out
- final AtomicLong exceptionCount = new AtomicLong(0);
- Transport clientTransport = createManualMQTTClient(exceptionCount);
+ Transport clientTransport = createManualMQTTClient();
clientTransport.start();
CONNECT connectFrame = new CONNECT().clientId(new UTF8Buffer("testClient")).keepAlive((short)2);
clientTransport.oneway(connectFrame.encode());
+ // wait for broker to register the MQTT connection
+ TimeUnit.SECONDS.sleep(1);
+ assertTrue(mqttConnector.getConnections().size() > 0);
-
+ // wait for the inactivity monitor to remove the connection due to inactivity
TimeUnit.SECONDS.sleep(10);
- System.out.println("Done waiting");
- assertEquals("We have elapsed the keep alive, we should have disconnected", 1, exceptionCount.get());
+ assertTrue(mqttConnector.getConnections().size() == 0);
+ assertTrue("Should have seen client transport exception", exceptions.size() > 0);
+
+ clientTransport.stop();
}
- private Transport createManualMQTTClient(final AtomicLong exceptionCount) throws IOException, URISyntaxException {
+ private Transport createManualMQTTClient() throws IOException, URISyntaxException {
Transport clientTransport = new TcpTransport(new MQTTWireFormat(), SocketFactory.getDefault(),
new URI("tcp://localhost:1883"), null);
clientTransport.setTransportListener(new TransportListener() {
@@ -322,8 +329,7 @@ public class MQTTTest {
@Override
public void onException(IOException error) {
- System.out.println("Exception!!!" + error.getMessage());
- exceptionCount.incrementAndGet();
+ exceptions.add(error);
}
@Override
@@ -353,14 +359,70 @@ public class MQTTTest {
connection.disconnect();
}
+ @Test
+ public void testTurnOffInactivityMonitor()throws Exception{
+ addMQTTConnector(brokerService, "?transport.useInactivityMonitor=false");
+ brokerService.start();
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setKeepAlive((short)2);
+ final BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+
+ TimeUnit.SECONDS.sleep(10);
+
+
+ assertTrue("KeepAlive didn't work properly", connection.isConnected());
+
+ connection.disconnect();
+ }
+
+ @Test
+ public void testPingOnMQTTNIO() throws Exception {
+ brokerService.addConnector("mqtt+nio://localhost:1883");
+ brokerService.start();
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setKeepAlive((short)2);
+ final BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+
+ TimeUnit.SECONDS.sleep(10);
+
+ assertTrue("KeepAlive didn't work properly", connection.isConnected());
+
+ connection.disconnect();
+ }
+
+ @Test
+ public void testDefaultKeepAliveWhenClientSpecifiesZero() throws Exception {
+ // default keep alive in milliseconds
+ brokerService.addConnector("mqtt://localhost:1883?transport.defaultKeepAlive=2000");
+ brokerService.start();
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setKeepAlive((short)0);
+ final BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+
+ TimeUnit.SECONDS.sleep(10);
+
+ assertFalse("KeepAlive didn't work properly", connection.isConnected());
+
+ }
+
protected void addMQTTConnector(BrokerService brokerService) throws Exception {
brokerService.addConnector("mqtt://localhost:1883");
}
+ protected void addMQTTConnector(BrokerService brokerService, String config) throws Exception {
+ brokerService.addConnector("mqtt://localhost:1883" + config);
+ }
+
protected MQTT createMQTTConnection() throws Exception {
MQTT mqtt = new MQTT();
mqtt.setHost("localhost", 1883);
+ // shut off connect retry
+ mqtt.setConnectAttemptsMax(0);
+ mqtt.setReconnectAttemptsMax(0);
return mqtt;
}