You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2013/01/08 21:33:17 UTC
svn commit: r1430496 - in /activemq/trunk/activemq-mqtt/src:
main/java/org/apache/activemq/transport/mqtt/
test/java/org/apache/activemq/transport/mqtt/
Author: rajdavies
Date: Tue Jan 8 20:33:16 2013
New Revision: 1430496
URL: http://svn.apache.org/viewvc?rev=1430496&view=rev
Log:
Refactored to make it easier to test with multiple MQTT client providers
Added:
activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java
- copied, changed from r1429809, activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java (with props)
activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java (with props)
Modified:
activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java
activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
Modified: activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java?rev=1430496&r1=1430495&r2=1430496&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java (original)
+++ activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java Tue Jan 8 20:33:16 2013
@@ -305,7 +305,6 @@ class MQTTProtocolConverter {
//by default subscribers are persistent
consumerInfo.setSubscriptionName(connect.clientId().toString());
}
-
MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
subscriptionsByConsumerId.put(id, mqttSubscription);
@@ -364,13 +363,13 @@ class MQTTProtocolConverter {
if (sub != null) {
MessageAck ack = sub.createMessageAck(md);
PUBLISH publish = sub.createPublish((ActiveMQMessage) md.getMessage());
- if (ack != null && sub.expectAck()) {
+ if (ack != null && sub.expectAck(publish)) {
synchronized (consumerAcks) {
consumerAcks.put(publish.messageId(), ack);
}
}
getMQTTTransport().sendToMQTT(publish.encode());
- if (ack != null && !sub.expectAck()) {
+ if (ack != null && !sub.expectAck(publish)) {
getMQTTTransport().sendToActiveMQ(ack);
}
}
@@ -683,11 +682,10 @@ class MQTTProtocolConverter {
/**
* Set the default keep alive time (in milliseconds) that would be used if configured on server side
* and the client sends a keep-alive value of 0 (zero) on a CONNECT frame
- *
- * @param defaultKeepAlive the keepAlive in milliseconds
+ * @param keepAlive the keepAlive in milliseconds
*/
- public void setDefaultKeepAlive(long defaultKeepAlive) {
- this.defaultKeepAlive = defaultKeepAlive;
+ public void setDefaultKeepAlive(long keepAlive) {
+ this.defaultKeepAlive = keepAlive;
}
public int getActiveMQSubscriptionPrefetch() {
Modified: activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java?rev=1430496&r1=1430495&r2=1430496&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java (original)
+++ activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java Tue Jan 8 20:33:16 2013
@@ -56,16 +56,12 @@ class MQTTSubscription {
return publish;
}
- public boolean expectAck() {
- return qos != QoS.AT_MOST_ONCE;
- }
-
- public void setDestination(ActiveMQDestination destination) {
- this.destination = destination;
- }
-
- public ActiveMQDestination getDestination() {
- return destination;
+ public boolean expectAck(PUBLISH publish) {
+ QoS publishQoS = publish.qos();
+ if (publishQoS.compareTo(this.qos) > 0){
+ publishQoS = this.qos;
+ }
+ return !publishQoS.equals(QoS.AT_MOST_ONCE);
}
public ConsumerInfo getConsumerInfo() {
Copied: activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java (from r1429809, activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java?p2=activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java&p1=activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java&r1=1429809&r2=1430496&rev=1430496&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java (original)
+++ activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java Tue Jan 8 20:33:16 2013
@@ -18,8 +18,6 @@ package org.apache.activemq.transport.mq
import java.io.File;
import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
import java.security.ProtectionDomain;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
@@ -29,42 +27,31 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
-import javax.net.SocketFactory;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportListener;
-import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.Wait;
-import org.fusesource.hawtbuf.UTF8Buffer;
-import org.fusesource.mqtt.client.BlockingConnection;
-import org.fusesource.mqtt.client.MQTT;
-import org.fusesource.mqtt.client.Message;
-import org.fusesource.mqtt.client.QoS;
-import org.fusesource.mqtt.client.Topic;
-import org.fusesource.mqtt.codec.CONNECT;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
import static org.junit.Assert.*;
-public class MQTTTest {
+public abstract class AbstractMQTTTest {
+ protected TransportConnector mqttConnector;
+
+ public static final int AT_MOST_ONCE =0;
+ public static final int AT_LEAST_ONCE = 1;
+ public static final int EXACTLY_ONCE =2;
public File basedir() throws IOException {
ProtectionDomain protectionDomain = getClass().getProtectionDomain();
return new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalFile();
}
- protected static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
protected BrokerService brokerService;
protected LinkedList<Throwable> exceptions = new LinkedList<Throwable>();
protected int numberOfMessages;
@@ -93,21 +80,20 @@ public class MQTTTest {
public void testSendAndReceiveMQTT() throws Exception {
addMQTTConnector();
brokerService.start();
- MQTT mqtt = createMQTTConnection();
- final BlockingConnection subscribeConnection = mqtt.blockingConnection();
- subscribeConnection.connect();
- Topic topic = new Topic("foo/bah",QoS.AT_MOST_ONCE);
- Topic[] topics = {topic};
- subscribeConnection.subscribe(topics);
+ final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
+ initializeConnection(subscriptionProvider);
+
+
+ subscriptionProvider.subscribe("foo/bah",AT_MOST_ONCE);
+
final CountDownLatch latch = new CountDownLatch(numberOfMessages);
Thread thread = new Thread(new Runnable() {
public void run() {
for (int i = 0; i < numberOfMessages; i++){
try {
- Message message = subscribeConnection.receive(5, TimeUnit.SECONDS);
- assertNotNull("Should get a message", message);
- message.ack();
+ byte[] payload = subscriptionProvider.receive(10000);
+ assertNotNull("Should get a message", payload);
latch.countDown();
} catch (Exception e) {
e.printStackTrace();
@@ -119,92 +105,137 @@ public class MQTTTest {
});
thread.start();
- BlockingConnection publisherConnection = mqtt.blockingConnection();
- publisherConnection.connect();
+ final MQTTClientProvider publishProvider = getMQTTClientProvider();
+ initializeConnection(publishProvider);
+
for (int i = 0; i < numberOfMessages; i++){
String payload = "Message " + i;
- publisherConnection.publish(topic.name().toString(),payload.getBytes(),QoS.AT_LEAST_ONCE,false);
+ publishProvider.publish("foo/bah",payload.getBytes(),AT_LEAST_ONCE);
}
latch.await(10, TimeUnit.SECONDS);
assertEquals(0, latch.getCount());
- subscribeConnection.disconnect();
- publisherConnection.disconnect();
+ subscriptionProvider.disconnect();
+ publishProvider.disconnect();
}
@Test
- public void testSendAndReceiveAtMostOnce() throws Exception {
+ public void testSendAtMostOnceReceiveExactlyOnce() throws Exception {
+ /**
+ * Although subscribing with EXACTLY ONCE, the message gets published
+ * with AT_MOST_ONCE - in MQTT the QoS is always determined by the message
+ * as published - not the wish of the subscriber
+ */
addMQTTConnector();
brokerService.start();
- MQTT mqtt = createMQTTConnection();
- mqtt.setKeepAlive(Short.MAX_VALUE);
- BlockingConnection connection = mqtt.blockingConnection();
- connection.connect();
+ final MQTTClientProvider provider = getMQTTClientProvider();
+ initializeConnection(provider);
+ provider.subscribe("foo",EXACTLY_ONCE);
+ for (int i = 0; i < numberOfMessages; i++) {
+ String payload = "Test Message: " + i;
+ provider.publish("foo", payload.getBytes(), AT_MOST_ONCE);
+ byte[] message = provider.receive(5000);
+ assertNotNull("Should get a message", message);
+ assertEquals(payload, new String(message));
+ }
+ provider.disconnect();
+ }
- Topic[] topics = {new Topic(utf8("foo"), QoS.AT_MOST_ONCE)};
- connection.subscribe(topics);
+ @Test
+ public void testSendAtLeastOnceReceiveExactlyOnce() throws Exception {
+ addMQTTConnector();
+ brokerService.start();
+
+ final MQTTClientProvider provider = getMQTTClientProvider();
+ initializeConnection(provider);
+ provider.subscribe("foo",EXACTLY_ONCE);
for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i;
- connection.publish("foo", payload.getBytes(), QoS.AT_MOST_ONCE, false);
- Message message = connection.receive(5, TimeUnit.SECONDS);
+ provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
+ byte[] message = provider.receive(5000);
assertNotNull("Should get a message", message);
- assertEquals(payload, new String(message.getPayload()));
+ assertEquals(payload, new String(message));
}
- connection.disconnect();
+ provider.disconnect();
}
@Test
- public void testSendAndReceiveAtLeastOnce() throws Exception {
+ public void testSendAtLeastOnceReceiveAtMostOnce() throws Exception {
addMQTTConnector();
brokerService.start();
- MQTT mqtt = createMQTTConnection();
- mqtt.setKeepAlive(Short.MAX_VALUE);
- BlockingConnection connection = mqtt.blockingConnection();
- connection.connect();
+ final MQTTClientProvider provider = getMQTTClientProvider();
+ initializeConnection(provider);
+ provider.subscribe("foo",AT_MOST_ONCE);
+ for (int i = 0; i < numberOfMessages; i++) {
+ String payload = "Test Message: " + i;
+ provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
+ byte[] message = provider.receive(5000);
+ assertNotNull("Should get a message", message);
+ assertEquals(payload, new String(message));
+ }
+ provider.disconnect();
+ }
- Topic[] topics = {new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)};
- connection.subscribe(topics);
+
+ @Test
+ public void testSendAndReceiveAtMostOnce() throws Exception {
+ addMQTTConnector();
+ brokerService.start();
+
+ final MQTTClientProvider provider = getMQTTClientProvider();
+ initializeConnection(provider);
+ provider.subscribe("foo",AT_MOST_ONCE);
for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i;
- connection.publish("foo", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
- Message message = connection.receive(5, TimeUnit.SECONDS);
+ provider.publish("foo", payload.getBytes(), AT_MOST_ONCE);
+ byte[] message = provider.receive(5000);
assertNotNull("Should get a message", message);
- message.ack();
- assertEquals(payload, new String(message.getPayload()));
+ assertEquals(payload, new String(message));
}
- connection.disconnect();
+ provider.disconnect();
}
@Test
- public void testSendAndReceiveExactlyOnce() throws Exception {
+ public void testSendAndReceiveAtLeastOnce() throws Exception {
addMQTTConnector();
brokerService.start();
- MQTT publisher = createMQTTConnection();
- BlockingConnection pubConnection = publisher.blockingConnection();
- pubConnection.connect();
+ final MQTTClientProvider provider = getMQTTClientProvider();
+ initializeConnection(provider);
+ provider.subscribe("foo",AT_LEAST_ONCE);
+ for (int i = 0; i < numberOfMessages; i++) {
+ String payload = "Test Message: " + i;
+ provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
+ byte[] message = provider.receive(5000);
+ assertNotNull("Should get a message", message);
+ assertEquals(payload, new String(message));
+ }
+ provider.disconnect();
+ }
+
+ @Test
+ public void testSendAndReceiveExactlyOnce() throws Exception {
+ addMQTTConnector();
+ brokerService.start();
+ final MQTTClientProvider publisher = getMQTTClientProvider();
+ initializeConnection(publisher);
- MQTT subscriber = createMQTTConnection();
- BlockingConnection subConnection = subscriber.blockingConnection();
+ final MQTTClientProvider subscriber = getMQTTClientProvider();
+ initializeConnection(subscriber);
- subConnection.connect();
- Topic[] topics = {new Topic(utf8("foo"), QoS.EXACTLY_ONCE)};
- subConnection.subscribe(topics);
+ subscriber.subscribe("foo",EXACTLY_ONCE);
for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i;
- pubConnection.publish("foo", payload.getBytes(), QoS.EXACTLY_ONCE, false);
- Message message = subConnection.receive(5, TimeUnit.SECONDS);
+ publisher.publish("foo", payload.getBytes(), EXACTLY_ONCE);
+ byte[] message = subscriber.receive(5000);
assertNotNull("Should get a message", message);
- LOG.debug(payload);
- message.ack();
- //System.err.println("Sent " + payload + " GOT " + new String(message.getPayload()));
- assertEquals(payload, new String(message.getPayload()));
+ assertEquals(payload, new String(message));
}
- subConnection.disconnect();
- pubConnection.disconnect();
+ subscriber.disconnect();
+ publisher.disconnect();
}
@Test
@@ -216,27 +247,22 @@ public class MQTTTest {
addMQTTConnector();
brokerService.start();
- MQTT publisher = createMQTTConnection();
- BlockingConnection pubConnection = publisher.blockingConnection();
-
- pubConnection.connect();
+ final MQTTClientProvider publisher = getMQTTClientProvider();
+ initializeConnection(publisher);
- MQTT subscriber = createMQTTConnection();
- BlockingConnection subConnection = subscriber.blockingConnection();
+ final MQTTClientProvider subscriber = getMQTTClientProvider();
+ initializeConnection(subscriber);
- subConnection.connect();
-
- Topic[] topics = {new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)};
- subConnection.subscribe(topics);
+ subscriber.subscribe("foo",AT_LEAST_ONCE);
for (int i = 0; i < 10; i++) {
- pubConnection.publish("foo", payload, QoS.AT_LEAST_ONCE, false);
- Message message = subConnection.receive(5, TimeUnit.SECONDS);
+ publisher.publish("foo", payload, AT_LEAST_ONCE);
+ byte[] message = subscriber.receive(5000);
assertNotNull("Should get a message", message);
- message.ack();
- assertArrayEquals(payload, message.getPayload());
+
+ assertArrayEquals(payload, message);
}
- subConnection.disconnect();
- pubConnection.disconnect();
+ subscriber.disconnect();
+ publisher.disconnect();
}
@@ -245,10 +271,11 @@ public class MQTTTest {
addMQTTConnector();
TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
brokerService.start();
- MQTT mqtt = createMQTTConnection();
- BlockingConnection connection = mqtt.blockingConnection();
+
+ final MQTTClientProvider provider = getMQTTClientProvider();
+ initializeConnection(provider);
final String DESTINATION_NAME = "foo.*";
- connection.connect();
+
ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
activeMQConnection.start();
@@ -258,7 +285,7 @@ public class MQTTTest {
for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i;
- connection.publish("foo/bah", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
+ provider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE);
ActiveMQMessage message = (ActiveMQMessage) consumer.receive(5000);
assertNotNull("Should get a message", message);
ByteSequence bs = message.getContent();
@@ -267,7 +294,7 @@ public class MQTTTest {
activeMQConnection.close();
- connection.disconnect();
+ provider.disconnect();
}
@Test
@@ -275,10 +302,8 @@ public class MQTTTest {
addMQTTConnector();
TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
brokerService.start();
- MQTT mqtt = createMQTTConnection();
- mqtt.setKeepAlive(Short.MAX_VALUE);
- BlockingConnection connection = mqtt.blockingConnection();
- connection.connect();
+ final MQTTClientProvider provider = getMQTTClientProvider();
+ initializeConnection(provider);
ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
activeMQConnection.start();
@@ -286,141 +311,20 @@ public class MQTTTest {
javax.jms.Topic jmsTopic = s.createTopic("foo.far");
MessageProducer producer = s.createProducer(jmsTopic);
- Topic[] topics = {new Topic(utf8("foo/+"), QoS.AT_MOST_ONCE)};
- connection.subscribe(topics);
+ provider.subscribe("foo/+",AT_MOST_ONCE);
for (int i = 0; i < numberOfMessages; i++) {
String payload = "This is Test Message: " + i;
TextMessage sendMessage = s.createTextMessage(payload);
producer.send(sendMessage);
- Message message = connection.receive(5, TimeUnit.SECONDS);
+ byte[] message = provider.receive(5000);
assertNotNull("Should get a message", message);
- message.ack();
- assertEquals(payload, new String(message.getPayload()));
- }
- connection.disconnect();
- }
-
- public void testInactivityTimeoutDisconnectsClient() throws Exception{
-
- addMQTTConnector();
- brokerService.start();
-
- // manually need to create the client so we don't send keep alive (PINGREQ) frames to keep the conn
- // from timing out
- Transport clientTransport = createManualMQTTClient();
- clientTransport.start();
- CONNECT connectFrame = new CONNECT().clientId(new UTF8Buffer("testClient")).keepAlive((short)2);
- clientTransport.oneway(connectFrame.encode());
-
- // wait for broker to register the MQTT connection
- assertTrue("MQTT Connection should be registered.", Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return mqttConnector.getConnections().size() > 0;
- }
- }));
-
- // wait for broker to time out the MQTT connection due to inactivity
- assertTrue("MQTT Connection should be timed out.", Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return mqttConnector.getConnections().size() == 0;
- }
- }));
-
- assertTrue("Should have seen client transport exception", exceptions.size() > 0);
-
- clientTransport.stop();
- }
-
- private Transport createManualMQTTClient() throws IOException, URISyntaxException {
- Transport clientTransport = new TcpTransport(new MQTTWireFormat(), SocketFactory.getDefault(),
- new URI("tcp://localhost:"+mqttConnector.getConnectUri().getPort()), null);
- clientTransport.setTransportListener(new TransportListener() {
- @Override
- public void onCommand(Object command) {
- }
-
- @Override
- public void onException(IOException error) {
- exceptions.add(error);
- }
-
- @Override
- public void transportInterupted() {
- }
-
- @Override
- public void transportResumed() {
- }
- });
- return clientTransport;
- }
-
- @Test
- public void testPingKeepsInactivityMonitorAlive() throws Exception {
- addMQTTConnector();
- brokerService.start();
- MQTT mqtt = createMQTTConnection();
- mqtt.setKeepAlive((short)2);
- final BlockingConnection connection = mqtt.blockingConnection();
- connection.connect();
-
- assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return connection.isConnected();
- }
- }));
-
- connection.disconnect();
- }
-
- @Test
- public void testTurnOffInactivityMonitor()throws Exception{
- addMQTTConnector("?transport.useInactivityMonitor=false");
- brokerService.start();
- MQTT mqtt = createMQTTConnection();
- mqtt.setKeepAlive((short)2);
- final BlockingConnection connection = mqtt.blockingConnection();
- connection.connect();
-
- assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return connection.isConnected();
- }
- }));
-
- connection.disconnect();
- }
-
- @Test
- public void testDefaultKeepAliveWhenClientSpecifiesZero() throws Exception {
- // default keep alive in milliseconds
- addMQTTConnector("?transport.defaultKeepAlive=2000");
- brokerService.start();
- MQTT mqtt = createMQTTConnection();
- mqtt.setKeepAlive((short)0);
- final BlockingConnection connection = mqtt.blockingConnection();
- connection.connect();
-
- assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return connection.isConnected();
- }
- }));
+ assertEquals(payload, new String(message));
+ }
+ provider.disconnect();
+ activeMQConnection.close();
}
- TransportConnector mqttConnector;
-
protected String getProtocolScheme() {
return "mqtt";
}
@@ -433,12 +337,9 @@ public class MQTTTest {
mqttConnector= brokerService.addConnector(getProtocolScheme()+"://localhost:0" + config);
}
- protected MQTT createMQTTConnection() throws Exception {
- MQTT mqtt = new MQTT();
- mqtt.setHost("localhost", mqttConnector.getConnectUri().getPort());
- // shut off connect retry
- mqtt.setConnectAttemptsMax(0);
- mqtt.setReconnectAttemptsMax(0);
- return mqtt;
+ protected void initializeConnection(MQTTClientProvider provider) throws Exception {
+ provider.connect("tcp://localhost:"+mqttConnector.getConnectUri().getPort());
}
+
+ protected abstract MQTTClientProvider getMQTTClientProvider();
}
\ No newline at end of file
Added: activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java?rev=1430496&view=auto
==============================================================================
--- activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java (added)
+++ activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java Tue Jan 8 20:33:16 2013
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.Message;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
+
+class FuseMQQTTClientProvider implements MQTTClientProvider {
+ private final MQTT mqtt = new MQTT();
+ private BlockingConnection connection;
+ @Override
+ public void connect(String host) throws Exception {
+ mqtt.setHost(host);
+ // shut off connect retry
+ mqtt.setConnectAttemptsMax(0);
+ mqtt.setReconnectAttemptsMax(0);
+ connection = mqtt.blockingConnection();
+ connection.connect();
+ }
+
+ @Override
+ public void disconnect() throws Exception {
+ if (this.connection != null){
+ this.connection.disconnect();
+ }
+ }
+
+ @Override
+ public void publish(String topic, byte[] payload, int qos) throws Exception {
+ connection.publish(topic,payload, QoS.values()[qos],false);
+ }
+
+ @Override
+ public void subscribe(String topic, int qos) throws Exception {
+ Topic[] topics = {new Topic(utf8(topic), QoS.values()[qos])};
+ connection.subscribe(topics);
+ }
+
+ @Override
+ public byte[] receive(int timeout) throws Exception {
+ byte[] result = null;
+ Message message = connection.receive(timeout, TimeUnit.MILLISECONDS);
+ if (message != null){
+ result = message.getPayload();
+ message.ack();
+ }
+ return result;
+ }
+
+ @Override
+ public void setSslContext(SSLContext sslContext) {
+ mqtt.setSslContext(sslContext);
+ }
+}
Propchange: activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java?rev=1430496&view=auto
==============================================================================
--- activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java (added)
+++ activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java Tue Jan 8 20:33:16 2013
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+public interface MQTTClientProvider {
+ void connect(String host) throws Exception;
+ void disconnect() throws Exception;
+ void publish(String topic,byte[] payload,int qos) throws Exception;
+ void subscribe(String topic,int qos) throws Exception;
+ byte[] receive(int timeout) throws Exception;
+ void setSslContext(javax.net.ssl.SSLContext sslContext);
+
+}
Propchange: activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java?rev=1430496&r1=1430495&r2=1430496&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java (original)
+++ activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java Tue Jan 8 20:33:16 2013
@@ -24,9 +24,7 @@ import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
-import org.apache.activemq.broker.BrokerService;
import org.fusesource.mqtt.client.MQTT;
-import org.junit.Ignore;
public class MQTTSSLTest extends MQTTTest {
@@ -55,6 +53,15 @@ public class MQTTSSLTest extends MQTTTes
return mqtt;
}
+ protected void initializeConnection(MQTTClientProvider provider) throws Exception {
+ SSLContext ctx = SSLContext.getInstance("TLS");
+ ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
+ provider.setSslContext(ctx);
+ provider.connect("ssl://localhost:"+mqttConnector.getConnectUri().getPort());
+ }
+
+
+
static class DefaultTrustManager implements X509TrustManager {
public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
Modified: activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java?rev=1430496&r1=1430495&r2=1430496&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java (original)
+++ activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java Tue Jan 8 20:33:16 2013
@@ -16,348 +16,14 @@
*/
package org.apache.activemq.transport.mqtt;
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.security.ProtectionDomain;
-import java.util.LinkedList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.net.SocketFactory;
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.AutoFailTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportListener;
-import org.apache.activemq.transport.tcp.TcpTransport;
-import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.Wait;
-import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
-import org.fusesource.mqtt.client.Message;
-import org.fusesource.mqtt.client.QoS;
-import org.fusesource.mqtt.client.Topic;
-import org.fusesource.mqtt.codec.CONNECT;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertTrue;
-public class MQTTTest {
-
- public File basedir() throws IOException {
- ProtectionDomain protectionDomain = getClass().getProtectionDomain();
- return new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalFile();
- }
-
- protected static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
- protected BrokerService brokerService;
- protected LinkedList<Throwable> exceptions = new LinkedList<Throwable>();
- protected int numberOfMessages;
- AutoFailTestSupport autoFailTestSupport = new AutoFailTestSupport() {};
-
- @Before
- public void startBroker() throws Exception {
- autoFailTestSupport.startAutoFailThread();
- exceptions.clear();
- brokerService = new BrokerService();
- brokerService.setPersistent(false);
- brokerService.setAdvisorySupport(false);
- brokerService.setUseJmx(false);
- this.numberOfMessages = 3000;
- }
-
- @After
- public void stopBroker() throws Exception {
- if (brokerService != null) {
- brokerService.stop();
- }
- autoFailTestSupport.stopAutoFailThread();
- }
-
- @Test
- public void testSendAndReceiveMQTT() throws Exception {
- addMQTTConnector();
- brokerService.start();
- MQTT mqtt = createMQTTConnection();
- final BlockingConnection subscribeConnection = mqtt.blockingConnection();
- subscribeConnection.connect();
- Topic topic = new Topic("foo/bah",QoS.AT_MOST_ONCE);
- Topic[] topics = {topic};
- subscribeConnection.subscribe(topics);
- final CountDownLatch latch = new CountDownLatch(numberOfMessages);
-
- Thread thread = new Thread(new Runnable() {
- public void run() {
- for (int i = 0; i < numberOfMessages; i++){
- try {
- Message message = subscribeConnection.receive(5, TimeUnit.SECONDS);
- assertNotNull("Should get a message", message);
- message.ack();
- latch.countDown();
- } catch (Exception e) {
- e.printStackTrace();
- break;
- }
-
- }
- }
- });
- thread.start();
-
- BlockingConnection publisherConnection = mqtt.blockingConnection();
- publisherConnection.connect();
- for (int i = 0; i < numberOfMessages; i++){
- String payload = "Message " + i;
- publisherConnection.publish(topic.name().toString(),payload.getBytes(),QoS.AT_LEAST_ONCE,false);
- }
-
- latch.await(10, TimeUnit.SECONDS);
- assertEquals(0, latch.getCount());
- subscribeConnection.disconnect();
- publisherConnection.disconnect();
- }
-
- @Test
- public void testSendAndReceiveAtMostOnce() throws Exception {
- addMQTTConnector();
- brokerService.start();
- MQTT mqtt = createMQTTConnection();
- mqtt.setKeepAlive(Short.MAX_VALUE);
- BlockingConnection connection = mqtt.blockingConnection();
-
- connection.connect();
-
- Topic[] topics = {new Topic(utf8("foo"), QoS.AT_MOST_ONCE)};
- connection.subscribe(topics);
- for (int i = 0; i < numberOfMessages; i++) {
- String payload = "Test Message: " + i;
- connection.publish("foo", payload.getBytes(), QoS.AT_MOST_ONCE, false);
- Message message = connection.receive(5, TimeUnit.SECONDS);
- assertNotNull("Should get a message", message);
- assertEquals(payload, new String(message.getPayload()));
- }
- connection.disconnect();
- }
-
- @Test
- public void testSendAndReceiveAtLeastOnce() throws Exception {
- addMQTTConnector();
- brokerService.start();
- MQTT mqtt = createMQTTConnection();
- mqtt.setKeepAlive(Short.MAX_VALUE);
- BlockingConnection connection = mqtt.blockingConnection();
-
- connection.connect();
-
- Topic[] topics = {new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)};
- connection.subscribe(topics);
- for (int i = 0; i < numberOfMessages; i++) {
- String payload = "Test Message: " + i;
- connection.publish("foo", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
- Message message = connection.receive(5, TimeUnit.SECONDS);
- assertNotNull("Should get a message", message);
- message.ack();
- assertEquals(payload, new String(message.getPayload()));
- }
- connection.disconnect();
- }
-
- @Test
- public void testSendAndReceiveExactlyOnce() throws Exception {
- addMQTTConnector();
- brokerService.start();
- MQTT publisher = createMQTTConnection();
- BlockingConnection pubConnection = publisher.blockingConnection();
-
- pubConnection.connect();
-
- MQTT subscriber = createMQTTConnection();
- BlockingConnection subConnection = subscriber.blockingConnection();
-
- subConnection.connect();
-
- Topic[] topics = {new Topic(utf8("foo"), QoS.EXACTLY_ONCE)};
- subConnection.subscribe(topics);
- for (int i = 0; i < numberOfMessages; i++) {
- String payload = "Test Message: " + i;
- pubConnection.publish("foo", payload.getBytes(), QoS.EXACTLY_ONCE, false);
- Message message = subConnection.receive(5, TimeUnit.SECONDS);
- assertNotNull("Should get a message", message);
- LOG.debug(payload);
- message.ack();
- //System.err.println("Sent " + payload + " GOT " + new String(message.getPayload()));
- assertEquals(payload, new String(message.getPayload()));
- }
- subConnection.disconnect();
- pubConnection.disconnect();
- }
-
- @Test
- public void testSendAndReceiveLargeMessages() throws Exception {
- byte[] payload = new byte[1024 * 32];
- for (int i = 0; i < payload.length; i++){
- payload[i] = '2';
- }
- addMQTTConnector();
- brokerService.start();
-
- MQTT publisher = createMQTTConnection();
- BlockingConnection pubConnection = publisher.blockingConnection();
-
- pubConnection.connect();
-
- MQTT subscriber = createMQTTConnection();
- BlockingConnection subConnection = subscriber.blockingConnection();
-
- subConnection.connect();
-
- Topic[] topics = {new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)};
- subConnection.subscribe(topics);
- for (int i = 0; i < 10; i++) {
- pubConnection.publish("foo", payload, QoS.AT_LEAST_ONCE, false);
- Message message = subConnection.receive(5, TimeUnit.SECONDS);
- assertNotNull("Should get a message", message);
- message.ack();
- assertArrayEquals(payload, message.getPayload());
- }
- subConnection.disconnect();
- pubConnection.disconnect();
- }
-
-
- @Test
- public void testSendMQTTReceiveJMS() throws Exception {
- addMQTTConnector();
- TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
- brokerService.start();
- MQTT mqtt = createMQTTConnection();
- BlockingConnection connection = mqtt.blockingConnection();
- final String DESTINATION_NAME = "foo.*";
- connection.connect();
-
- ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
- activeMQConnection.start();
- Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- javax.jms.Topic jmsTopic = s.createTopic(DESTINATION_NAME);
- MessageConsumer consumer = s.createConsumer(jmsTopic);
-
- for (int i = 0; i < numberOfMessages; i++) {
- String payload = "Test Message: " + i;
- connection.publish("foo/bah", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
- ActiveMQMessage message = (ActiveMQMessage) consumer.receive(5000);
- assertNotNull("Should get a message", message);
- ByteSequence bs = message.getContent();
- assertEquals(payload, new String(bs.data, bs.offset, bs.length));
- }
-
-
- activeMQConnection.close();
- connection.disconnect();
- }
-
- @Test
- public void testSendJMSReceiveMQTT() throws Exception {
- addMQTTConnector();
- TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
- brokerService.start();
- MQTT mqtt = createMQTTConnection();
- mqtt.setKeepAlive(Short.MAX_VALUE);
- BlockingConnection connection = mqtt.blockingConnection();
- connection.connect();
-
- ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
- activeMQConnection.start();
- Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- javax.jms.Topic jmsTopic = s.createTopic("foo.far");
- MessageProducer producer = s.createProducer(jmsTopic);
-
- Topic[] topics = {new Topic(utf8("foo/+"), QoS.AT_MOST_ONCE)};
- connection.subscribe(topics);
- for (int i = 0; i < numberOfMessages; i++) {
- String payload = "This is Test Message: " + i;
- TextMessage sendMessage = s.createTextMessage(payload);
- producer.send(sendMessage);
- Message message = connection.receive(5, TimeUnit.SECONDS);
- assertNotNull("Should get a message", message);
- message.ack();
- assertEquals(payload, new String(message.getPayload()));
- }
- connection.disconnect();
- }
-
- public void testInactivityTimeoutDisconnectsClient() throws Exception{
-
- addMQTTConnector();
- brokerService.start();
-
- // manually need to create the client so we don't send keep alive (PINGREQ) frames to keep the conn
- // from timing out
- Transport clientTransport = createManualMQTTClient();
- clientTransport.start();
- CONNECT connectFrame = new CONNECT().clientId(new UTF8Buffer("testClient")).keepAlive((short)2);
- clientTransport.oneway(connectFrame.encode());
-
- // wait for broker to register the MQTT connection
- assertTrue("MQTT Connection should be registered.", Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return mqttConnector.getConnections().size() > 0;
- }
- }));
-
- // wait for broker to time out the MQTT connection due to inactivity
- assertTrue("MQTT Connection should be timed out.", Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return mqttConnector.getConnections().size() == 0;
- }
- }));
-
- assertTrue("Should have seen client transport exception", exceptions.size() > 0);
-
- clientTransport.stop();
- }
-
- private Transport createManualMQTTClient() throws IOException, URISyntaxException {
- Transport clientTransport = new TcpTransport(new MQTTWireFormat(), SocketFactory.getDefault(),
- new URI("tcp://localhost:"+mqttConnector.getConnectUri().getPort()), null);
- clientTransport.setTransportListener(new TransportListener() {
- @Override
- public void onCommand(Object command) {
- }
-
- @Override
- public void onException(IOException error) {
- exceptions.add(error);
- }
-
- @Override
- public void transportInterupted() {
- }
-
- @Override
- public void transportResumed() {
- }
- });
- return clientTransport;
- }
+public class MQTTTest extends AbstractMQTTTest {
@Test
public void testPingKeepsInactivityMonitorAlive() throws Exception {
@@ -419,7 +85,6 @@ public class MQTTTest {
}));
}
- TransportConnector mqttConnector;
protected String getProtocolScheme() {
return "mqtt";
@@ -433,6 +98,11 @@ public class MQTTTest {
mqttConnector= brokerService.addConnector(getProtocolScheme()+"://localhost:0" + config);
}
+ @Override
+ protected MQTTClientProvider getMQTTClientProvider() {
+ return new FuseMQQTTClientProvider();
+ }
+
protected MQTT createMQTTConnection() throws Exception {
MQTT mqtt = new MQTT();
mqtt.setHost("localhost", mqttConnector.getConnectUri().getPort());