You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2014/02/11 23:52:49 UTC

git commit: Attempts to fix many of the compatibility issues with MQTT highlighted by AMQ-5043.

Updated Branches:
  refs/heads/trunk 99d533c06 -> 973580603


Attempts to fix many of the compatibility issues with MQTT highlighted by AMQ-5043.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/97358060
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/97358060
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/97358060

Branch: refs/heads/trunk
Commit: 973580603097f5e620e4d7f375dbbbbbb3581c84
Parents: 99d533c
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Tue Feb 11 17:52:57 2014 -0500
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Tue Feb 11 17:53:24 2014 -0500

----------------------------------------------------------------------
 .../activemq/filter/DestinationMapNode.java     |  9 +++
 activemq-mqtt/pom.xml                           |  6 ++
 .../transport/mqtt/MQTTProtocolConverter.java   | 65 ++++++++++++++------
 .../transport/mqtt/MQTTRetainedMessages.java    | 29 ++++++---
 .../activemq/transport/mqtt/IDERunner.java      | 39 ++++++++++++
 5 files changed, 123 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/97358060/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java
index f9ca156..d52f9de 100755
--- a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java
+++ b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java
@@ -133,6 +133,15 @@ public class DestinationMapNode implements DestinationNode {
         }
     }
 
+    public void set(String[] paths, int idx, Object value) {
+        if (idx >= paths.length) {
+            values.clear();
+            values.add(value);
+        } else {
+            getChildOrCreate(paths[idx]).add(paths, idx + 1, value);
+        }
+    }
+
     public void remove(String[] paths, int idx, Object value) {
         if (idx >= paths.length) {
             values.remove(value);

http://git-wip-us.apache.org/repos/asf/activemq/blob/97358060/activemq-mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-mqtt/pom.xml b/activemq-mqtt/pom.xml
index 022fc7c..6b125e8 100755
--- a/activemq-mqtt/pom.xml
+++ b/activemq-mqtt/pom.xml
@@ -135,6 +135,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-kahadb-store</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.eclipse.paho</groupId>
       <artifactId>mqtt-client</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/activemq/blob/97358060/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 9c6fa12..48c18ce 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -95,6 +95,7 @@ public class MQTTProtocolConverter {
     }
 
     void sendToActiveMQ(Command command, ResponseHandler handler) {
+        System.out.println(mqttTransport.getInactivityMonitor()+" ==> "+command);
         command.setCommandId(generateCommandId());
         if (handler != null) {
             command.setResponseRequired(true);
@@ -308,15 +309,14 @@ public class MQTTProtocolConverter {
         //check retained messages
         if (topics != null){
             for (Topic topic:topics){
-                Buffer buffer = retainedMessages.getMessage(topic.name().toString());
-                if (buffer != null){
-                    PUBLISH msg = new PUBLISH();
-                    msg.payload(buffer);
-                    msg.topicName(topic.name());
-                    try {
-                        getMQTTTransport().sendToMQTT(msg.encode());
-                    } catch (IOException e) {
-                        LOG.warn("Couldn't send retained message " + msg, e);
+                ActiveMQTopic destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
+                for (PUBLISH msg : retainedMessages.getMessages(destination)) {
+                    if( msg.payload().length > 0 ) {
+                        try {
+                            getMQTTTransport().sendToMQTT(msg.encode());
+                        } catch (IOException e) {
+                            LOG.warn("Couldn't send retained message " + msg, e);
+                        }
                     }
                 }
             }
@@ -333,7 +333,7 @@ public class MQTTProtocolConverter {
             consumerInfo.setDestination(destination);
             consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch());
             consumerInfo.setDispatchAsync(true);
-            if (!connect.cleanSession() && (connect.clientId() != null)) {
+            if ( connect.clientId() != null && topic.qos().ordinal() >= QoS.AT_LEAST_ONCE.ordinal() ) {
                 consumerInfo.setSubscriptionName(topic.qos()+":"+topic.name().toString());
             }
             MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
@@ -418,10 +418,10 @@ public class MQTTProtocolConverter {
 
     void onMQTTPublish(PUBLISH command) throws IOException, JMSException {
         checkConnected();
+        ActiveMQMessage message = convertMessage(command);
         if (command.retain()){
-            retainedMessages.addMessage(command.topicName().toString(),command.payload());
+            retainedMessages.addMessage((ActiveMQTopic) message.getDestination(), command);
         }
-        ActiveMQMessage message = convertMessage(command);
         message.setProducerId(producerId);
         message.onSend();
         sendToActiveMQ(message, createResponseHandler(command));
@@ -484,7 +484,7 @@ public class MQTTProtocolConverter {
         synchronized (activeMQTopicMap) {
             topic = activeMQTopicMap.get(command.topicName());
             if (topic == null) {
-                String topicName = command.topicName().toString().replaceAll("/", ".");
+                String topicName = convertMQTTToActiveMQ(command.topicName().toString());
                 topic = new ActiveMQTopic(topicName);
                 activeMQTopicMap.put(command.topicName(), topic);
             }
@@ -563,17 +563,21 @@ public class MQTTProtocolConverter {
         return mqttTransport;
     }
 
+    boolean willSent = false;
     public void onTransportError() {
         if (connect != null) {
-            if (connected.get() && connect.willTopic() != null && connect.willMessage() != null) {
+            if (connected.get() && connect.willTopic() != null && connect.willMessage() != null && !willSent) {
+                willSent = true;
                 try {
                     PUBLISH publish = new PUBLISH();
                     publish.topicName(connect.willTopic());
                     publish.qos(connect.willQos());
+                    publish.messageId((short) messageIdGenerator.getNextSequenceId());
                     publish.payload(connect.willMessage());
                     ActiveMQMessage message = convertMessage(publish);
                     message.setProducerId(producerId);
                     message.onSend();
+
                     sendToActiveMQ(message, null);
                 } catch (Exception e) {
                     LOG.warn("Failed to publish Will Message " + connect.willMessage());
@@ -703,10 +707,35 @@ public class MQTTProtocolConverter {
     }
 
     private String convertMQTTToActiveMQ(String name) {
-        String result = name.replace('#', '>');
-        result = result.replace('+', '*');
-        result = result.replace('/', '.');
-        return result;
+        char[] chars = name.toCharArray();
+        for (int i = 0; i < chars.length; i++) {
+            switch(chars[i]) {
+
+                case '#':
+                    chars[i] = '>';
+                    break;
+                case '>':
+                    chars[i] = '#';
+                    break;
+
+                case '+':
+                    chars[i] = '*';
+                    break;
+                case '*':
+                    chars[i] = '+';
+                    break;
+
+                case '/':
+                    chars[i] = '.';
+                    break;
+                case '.':
+                    chars[i] = '/';
+                    break;
+
+            }
+        }
+        String rc = new String(chars);
+        return rc;
     }
 
     public long getDefaultKeepAlive() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/97358060/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java
index e502dce..250366d 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java
@@ -18,36 +18,51 @@ package org.apache.activemq.transport.mqtt;
 
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.util.LRUCache;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.filter.DestinationMapNode;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.util.ServiceSupport;
 import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.UTF8Buffer;
+import org.fusesource.mqtt.codec.PUBLISH;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashSet;
+import java.util.Set;
+
 public class MQTTRetainedMessages extends ServiceSupport {
     private static final Logger LOG = LoggerFactory.getLogger(MQTTRetainedMessages.class);
     private static final Object LOCK = new Object();
-    private LRUCache<String,Buffer> cache = new LRUCache<String, Buffer>(10000);
+
+    DestinationMapNode retainedMessages = new DestinationMapNode(null);
 
     private MQTTRetainedMessages(){
     }
 
     @Override
     protected void doStop(ServiceStopper stopper) throws Exception {
-       cache.clear();
+        synchronized (this) {
+            retainedMessages = new DestinationMapNode(null);
+        }
     }
 
     @Override
     protected void doStart() throws Exception {
     }
 
-   public void addMessage(String destination,Buffer payload){
-       cache.put(destination,payload);
+   public void addMessage(ActiveMQTopic dest, PUBLISH publish){
+       synchronized (this) {
+           retainedMessages.set(dest.getDestinationPaths(), 0, publish);
+       }
    }
 
-   public Buffer getMessage(String destination){
-       return cache.get(destination);
+   public Set<PUBLISH> getMessages(ActiveMQTopic topic){
+       Set answer = new HashSet();
+       synchronized (this) {
+           retainedMessages.appendMatchingValues(answer, topic.getDestinationPaths(), 0);
+       }
+       return (Set<PUBLISH>)answer;
    }
 
     public static MQTTRetainedMessages getMQTTRetainedMessages(BrokerService broker){

http://git-wip-us.apache.org/repos/asf/activemq/blob/97358060/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/IDERunner.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/IDERunner.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/IDERunner.java
new file mode 100644
index 0000000..48e34c4
--- /dev/null
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/IDERunner.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+
+import java.io.File;
+
+/**
+ * A little helper class for testing a broker in your IDE.
+ */
+public class IDERunner {
+
+    public static void main(String[]args) throws Exception {
+        BrokerService bs = new BrokerService();
+        bs.addConnector("mqtt://0.0.0.0:1883?trace=true");
+        KahaDBStore store = new KahaDBStore();
+        store.setDirectory(new File("target/activemq-data/kahadb"));
+        bs.setPersistenceAdapter(store);
+        bs.deleteAllMessages();
+        bs.start();
+        bs.waitUntilStopped();
+    }
+}