You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/02/19 20:09:42 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-5065

Repository: activemq
Updated Branches:
  refs/heads/trunk 28c565c26 -> 0db7e69b4


https://issues.apache.org/jira/browse/AMQ-5065

Patch applied.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0db7e69b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0db7e69b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0db7e69b

Branch: refs/heads/trunk
Commit: 0db7e69b4eddda219c1623b94636d07ee47a0648
Parents: 28c565c
Author: Timothy Bish <ta...@gmai.com>
Authored: Wed Feb 19 14:09:34 2014 -0500
Committer: Timothy Bish <ta...@gmai.com>
Committed: Wed Feb 19 14:09:34 2014 -0500

----------------------------------------------------------------------
 .../transport/mqtt/MQTTProtocolConverter.java   | 22 +++++---
 .../activemq/transport/mqtt/MQTTTest.java       | 56 +++++++++++++++++++-
 2 files changed, 68 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/0db7e69b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 5b8f8c7..c270566 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -23,10 +23,10 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.zip.DataFormatException;
 import java.util.zip.Inflater;
-
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
+
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.*;
 import org.apache.activemq.store.PersistenceAdapterSupport;
@@ -338,23 +338,29 @@ public class MQTTProtocolConverter {
             } catch (IOException e) {
                 LOG.warn("Couldn't send SUBACK for " + command, e);
             }
-        } else {
-            LOG.warn("No topics defined for Subscription " + command);
-        }
-        //check retained messages
-        if (topics != null){
-            for (Topic topic:topics){
+            // check retained messages
+            for (int i = 0; i < topics.length; i++) {
+                final Topic topic = topics[i];
                 ActiveMQTopic destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
                 for (PUBLISH msg : retainedMessages.getMessages(destination)) {
                     if( msg.payload().length > 0 ) {
                         try {
-                            getMQTTTransport().sendToMQTT(msg.encode());
+                            PUBLISH retainedCopy = new PUBLISH();
+                            retainedCopy.topicName(msg.topicName());
+                            retainedCopy.retain(msg.retain());
+                            retainedCopy.messageId(msg.messageId());
+                            retainedCopy.payload(msg.payload());
+                            // set QoS of retained message to maximum of subscription QoS
+                            retainedCopy.qos(msg.qos().ordinal() > qos[i] ? QoS.values()[qos[i]] : msg.qos());
+                            getMQTTTransport().sendToMQTT(retainedCopy.encode());
                         } catch (IOException e) {
                             LOG.warn("Couldn't send retained message " + msg, e);
                         }
                     }
                 }
             }
+        } else {
+            LOG.warn("No topics defined for Subscription " + command);
         }
 
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/0db7e69b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 64a9b5f..73397de 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -16,11 +16,11 @@
  */
 package org.apache.activemq.transport.mqtt;
 
+import java.net.ProtocolException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -28,6 +28,8 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+
+import static org.junit.Assert.assertArrayEquals;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.TransportConnector;
@@ -41,10 +43,10 @@ import org.fusesource.mqtt.client.QoS;
 import org.fusesource.mqtt.client.Topic;
 import org.fusesource.mqtt.client.Tracer;
 import org.fusesource.mqtt.codec.MQTTFrame;
+import org.fusesource.mqtt.codec.PUBLISH;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import static org.junit.Assert.assertArrayEquals;
 
 public class MQTTTest extends AbstractMQTTTest {
 
@@ -324,6 +326,56 @@ public class MQTTTest extends AbstractMQTTTest {
         publisher.disconnect();
     }
 
+    @Test(timeout = 60 * 1000)
+    public void testMQTTRetainQoS() throws Exception {
+        addMQTTConnector();
+        brokerService.start();
+
+        String[] topics = { "AT_MOST_ONCE", "AT_LEAST_ONCE", "EXACTLY_ONCE" };
+        for (int i = 0; i < topics.length; i++) {
+            final String topic = topics[i];
+
+            MQTT mqtt = createMQTTConnection();
+            mqtt.setClientId("foo");
+            mqtt.setKeepAlive((short)2);
+
+            final int[] actualQoS = {-1};
+            mqtt.setTracer(new Tracer() {
+                @Override
+                public void onReceive(MQTTFrame frame) {
+                    // validate the QoS
+                    if (frame.messageType() == PUBLISH.TYPE) {
+                        PUBLISH publish = new PUBLISH();
+                        try {
+                            publish.decode(frame);
+                        } catch (ProtocolException e) {
+                            fail ("Failed decoding " + e.getMessage());
+                        }
+                        actualQoS[0] = publish.qos().ordinal();
+                    }
+                }
+            });
+
+            final BlockingConnection connection = mqtt.blockingConnection();
+            connection.connect();
+            connection.publish(topic, topic.getBytes(), QoS.EXACTLY_ONCE, true);
+            connection.subscribe(new Topic[]{ new Topic(topic, QoS.valueOf(topic)) });
+
+            final Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+            assertNotNull(msg);
+            assertEquals(topic, new String(msg.getPayload()));
+            int waitCount = 0;
+            while (actualQoS[0] == -1 && waitCount < 10) {
+                Thread.sleep(1000);
+                waitCount++;
+            }
+            assertEquals(i, actualQoS[0]);
+
+            connection.unsubscribe(new String[]{topic});
+            connection.disconnect();
+        }
+
+    }
 
     @Test(timeout=60 * 1000)
     public void testSendMQTTReceiveJMS() throws Exception {