You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/02/19 20:09:42 UTC
git commit: https://issues.apache.org/jira/browse/AMQ-5065
Repository: activemq
Updated Branches:
refs/heads/trunk 28c565c26 -> 0db7e69b4
https://issues.apache.org/jira/browse/AMQ-5065
Patch applied.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0db7e69b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0db7e69b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0db7e69b
Branch: refs/heads/trunk
Commit: 0db7e69b4eddda219c1623b94636d07ee47a0648
Parents: 28c565c
Author: Timothy Bish <ta...@gmai.com>
Authored: Wed Feb 19 14:09:34 2014 -0500
Committer: Timothy Bish <ta...@gmai.com>
Committed: Wed Feb 19 14:09:34 2014 -0500
----------------------------------------------------------------------
.../transport/mqtt/MQTTProtocolConverter.java | 22 +++++---
.../activemq/transport/mqtt/MQTTTest.java | 56 +++++++++++++++++++-
2 files changed, 68 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/0db7e69b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 5b8f8c7..c270566 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -23,10 +23,10 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
-
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
+
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.*;
import org.apache.activemq.store.PersistenceAdapterSupport;
@@ -338,23 +338,29 @@ public class MQTTProtocolConverter {
} catch (IOException e) {
LOG.warn("Couldn't send SUBACK for " + command, e);
}
- } else {
- LOG.warn("No topics defined for Subscription " + command);
- }
- //check retained messages
- if (topics != null){
- for (Topic topic:topics){
+ // check retained messages
+ for (int i = 0; i < topics.length; i++) {
+ final Topic topic = topics[i];
ActiveMQTopic destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
for (PUBLISH msg : retainedMessages.getMessages(destination)) {
if( msg.payload().length > 0 ) {
try {
- getMQTTTransport().sendToMQTT(msg.encode());
+ PUBLISH retainedCopy = new PUBLISH();
+ retainedCopy.topicName(msg.topicName());
+ retainedCopy.retain(msg.retain());
+ retainedCopy.messageId(msg.messageId());
+ retainedCopy.payload(msg.payload());
+ // set QoS of retained message to maximum of subscription QoS
+ retainedCopy.qos(msg.qos().ordinal() > qos[i] ? QoS.values()[qos[i]] : msg.qos());
+ getMQTTTransport().sendToMQTT(retainedCopy.encode());
} catch (IOException e) {
LOG.warn("Couldn't send retained message " + msg, e);
}
}
}
}
+ } else {
+ LOG.warn("No topics defined for Subscription " + command);
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/0db7e69b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 64a9b5f..73397de 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -16,11 +16,11 @@
*/
package org.apache.activemq.transport.mqtt;
+import java.net.ProtocolException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
@@ -28,6 +28,8 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
+
+import static org.junit.Assert.assertArrayEquals;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.TransportConnector;
@@ -41,10 +43,10 @@ import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.MQTTFrame;
+import org.fusesource.mqtt.codec.PUBLISH;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.junit.Assert.assertArrayEquals;
public class MQTTTest extends AbstractMQTTTest {
@@ -324,6 +326,56 @@ public class MQTTTest extends AbstractMQTTTest {
publisher.disconnect();
}
+ @Test(timeout = 60 * 1000)
+ public void testMQTTRetainQoS() throws Exception {
+ addMQTTConnector();
+ brokerService.start();
+
+ String[] topics = { "AT_MOST_ONCE", "AT_LEAST_ONCE", "EXACTLY_ONCE" };
+ for (int i = 0; i < topics.length; i++) {
+ final String topic = topics[i];
+
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setClientId("foo");
+ mqtt.setKeepAlive((short)2);
+
+ final int[] actualQoS = {-1};
+ mqtt.setTracer(new Tracer() {
+ @Override
+ public void onReceive(MQTTFrame frame) {
+ // validate the QoS
+ if (frame.messageType() == PUBLISH.TYPE) {
+ PUBLISH publish = new PUBLISH();
+ try {
+ publish.decode(frame);
+ } catch (ProtocolException e) {
+ fail ("Failed decoding " + e.getMessage());
+ }
+ actualQoS[0] = publish.qos().ordinal();
+ }
+ }
+ });
+
+ final BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+ connection.publish(topic, topic.getBytes(), QoS.EXACTLY_ONCE, true);
+ connection.subscribe(new Topic[]{ new Topic(topic, QoS.valueOf(topic)) });
+
+ final Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+ assertNotNull(msg);
+ assertEquals(topic, new String(msg.getPayload()));
+ int waitCount = 0;
+ while (actualQoS[0] == -1 && waitCount < 10) {
+ Thread.sleep(1000);
+ waitCount++;
+ }
+ assertEquals(i, actualQoS[0]);
+
+ connection.unsubscribe(new String[]{topic});
+ connection.disconnect();
+ }
+
+ }
@Test(timeout=60 * 1000)
public void testSendMQTTReceiveJMS() throws Exception {