You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/06/18 22:22:51 UTC
svn commit: r1494283 -
/activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
Author: tabish
Date: Tue Jun 18 20:22:51 2013
New Revision: 1494283
URL: http://svn.apache.org/r1494283
Log:
test case for: https://issues.apache.org/jira/browse/AMQ-4585
Modified:
activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
Modified: activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java?rev=1494283&r1=1494282&r2=1494283&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java (original)
+++ activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java Tue Jun 18 20:22:51 2013
@@ -114,61 +114,76 @@ public class MQTTTest extends AbstractMQ
@Test(timeout=300000)
public void testReceiveMessageSentWhileOffline() throws Exception {
- addMQTTConnector();
- brokerService.start();
- final MQTTClientProvider publisher = getMQTTClientProvider();
- initializeConnection(publisher);
+ byte[] payload = new byte[1024 * 32];
+ for (int i = 0; i < payload.length; i++){
+ payload[i] = '2';
+ }
- MQTT mqtt = createMQTTConnection();
- mqtt.setClientId("MQTT-Client");
- mqtt.setCleanSession(false);
+ int numberOfRuns = 100;
+ int messagesPerRun = 2;
- {
- final BlockingConnection subscriber = mqtt.blockingConnection();
- subscriber.connect();
- Topic[] topic = {new Topic("foo", QoS.EXACTLY_ONCE)};
- subscriber.subscribe(topic);
-
- for (int i = 0; i < numberOfMessages; i++) {
- String payload = "Test Message: " + i;
- publisher.publish("foo", payload.getBytes(), EXACTLY_ONCE);
- }
+ addMQTTConnector("trace=true");
+ brokerService.start();
+ MQTT mqttPub = createMQTTConnection();
+ mqttPub.setClientId("MQTT-Pub-Client");
- for (int i = 0; i < numberOfMessages / 2; i++) {
- Message message = subscriber.receive(5, TimeUnit.SECONDS);
- assertNotNull(message);
- byte[] payload = message.getPayload();
- String messageContent = new String(payload);
- if (i % 100 == 0) {
- LOG.debug("Received message from topic: " + message.getTopic() +
- " Message content: " + messageContent);
- }
- message.ack();
- }
+ MQTT mqttSub = createMQTTConnection();
+ mqttSub.setClientId("MQTT-Sub-Client");
+ mqttSub.setCleanSession(false);
- subscriber.disconnect();
- }
+ final BlockingConnection connectionPub = mqttPub.blockingConnection();
+ connectionPub.connect();
+
+ BlockingConnection connectionSub = mqttSub.blockingConnection();
+ connectionSub.connect();
- publisher.disconnect();
+ Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE)};
+ connectionSub.subscribe(topics);
- final BlockingConnection subscriber = mqtt.blockingConnection();
- subscriber.connect();
- Topic[] topic = {new Topic("foo", QoS.EXACTLY_ONCE)};
- subscriber.subscribe(topic);
+ for (int i = 0; i < messagesPerRun; ++i) {
+ connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE, false);
+ }
- for (int i = 0; i < numberOfMessages / 2; i++) {
- Message message = subscriber.receive(5, TimeUnit.SECONDS);
+ int received = 0;
+ for (int i = 0; i < messagesPerRun; ++i) {
+ Message message = connectionSub.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
- byte[] payload = message.getPayload();
+ received++;
+ payload = message.getPayload();
String messageContent = new String(payload);
- if (i % 100 == 0) {
- LOG.debug("Received message from topic: " + message.getTopic() +
- " Message content: " + messageContent);
- }
+ LOG.info("Received message from topic: " + message.getTopic() +
+ " Message content: " + messageContent);
message.ack();
}
+ connectionSub.disconnect();
+
+ for(int j = 0; j < numberOfRuns; j++) {
+
+ for (int i = 0; i < messagesPerRun; ++i) {
+ connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE, false);
+ }
+
+ mqttSub = createMQTTConnection();
+ mqttSub.setClientId("MQTT-Sub-Client");
+ mqttSub.setCleanSession(false);
+
+ connectionSub = mqttSub.blockingConnection();
+ connectionSub.connect();
+ connectionSub.subscribe(topics);
- subscriber.disconnect();
+ for (int i = 0; i < messagesPerRun; ++i) {
+ Message message = connectionSub.receive(5, TimeUnit.SECONDS);
+ assertNotNull(message);
+ received++;
+ payload = message.getPayload();
+ String messageContent = new String(payload);
+ LOG.info("Received message from topic: " + message.getTopic() +
+ " Message content: " + messageContent);
+ message.ack();
+ }
+ connectionSub.disconnect();
+ }
+ assertEquals("Should have received " + (messagesPerRun * (numberOfRuns + 1)) + " messages", (messagesPerRun * (numberOfRuns + 1)), received);
}
@Test(timeout=30000)
@@ -219,7 +234,7 @@ public class MQTTTest extends AbstractMQ
@Override
protected void addMQTTConnector() throws Exception {
- addMQTTConnector("");
+ addMQTTConnector();
}
@Override
@@ -249,12 +264,12 @@ public class MQTTTest extends AbstractMQ
return new Tracer(){
@Override
public void onReceive(MQTTFrame frame) {
-// LOG.info("Client Received:\n"+frame);
+ LOG.info("Client Received:\n"+frame);
}
@Override
public void onSend(MQTTFrame frame) {
-// LOG.info("Client Sent:\n" + frame);
+ LOG.info("Client Sent:\n" + frame);
}
@Override