You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2014/02/11 23:52:49 UTC
git commit: Attempts to fix many of the compatibility issues with
MQTT highlighted by AMQ-5043.
Updated Branches:
refs/heads/trunk 99d533c06 -> 973580603
Attempts to fix many of the compatibility issues with MQTT highlighted by AMQ-5043.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/97358060
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/97358060
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/97358060
Branch: refs/heads/trunk
Commit: 973580603097f5e620e4d7f375dbbbbbb3581c84
Parents: 99d533c
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Tue Feb 11 17:52:57 2014 -0500
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Tue Feb 11 17:53:24 2014 -0500
----------------------------------------------------------------------
.../activemq/filter/DestinationMapNode.java | 9 +++
activemq-mqtt/pom.xml | 6 ++
.../transport/mqtt/MQTTProtocolConverter.java | 65 ++++++++++++++------
.../transport/mqtt/MQTTRetainedMessages.java | 29 ++++++---
.../activemq/transport/mqtt/IDERunner.java | 39 ++++++++++++
5 files changed, 123 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/97358060/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java
index f9ca156..d52f9de 100755
--- a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java
+++ b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java
@@ -133,6 +133,15 @@ public class DestinationMapNode implements DestinationNode {
}
}
+ public void set(String[] paths, int idx, Object value) {
+ if (idx >= paths.length) {
+ values.clear();
+ values.add(value);
+ } else {
+ getChildOrCreate(paths[idx]).add(paths, idx + 1, value);
+ }
+ }
+
public void remove(String[] paths, int idx, Object value) {
if (idx >= paths.length) {
values.remove(value);
http://git-wip-us.apache.org/repos/asf/activemq/blob/97358060/activemq-mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-mqtt/pom.xml b/activemq-mqtt/pom.xml
index 022fc7c..6b125e8 100755
--- a/activemq-mqtt/pom.xml
+++ b/activemq-mqtt/pom.xml
@@ -135,6 +135,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-kahadb-store</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>mqtt-client</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/activemq/blob/97358060/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 9c6fa12..48c18ce 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -95,6 +95,7 @@ public class MQTTProtocolConverter {
}
void sendToActiveMQ(Command command, ResponseHandler handler) {
+ System.out.println(mqttTransport.getInactivityMonitor()+" ==> "+command);
command.setCommandId(generateCommandId());
if (handler != null) {
command.setResponseRequired(true);
@@ -308,15 +309,14 @@ public class MQTTProtocolConverter {
//check retained messages
if (topics != null){
for (Topic topic:topics){
- Buffer buffer = retainedMessages.getMessage(topic.name().toString());
- if (buffer != null){
- PUBLISH msg = new PUBLISH();
- msg.payload(buffer);
- msg.topicName(topic.name());
- try {
- getMQTTTransport().sendToMQTT(msg.encode());
- } catch (IOException e) {
- LOG.warn("Couldn't send retained message " + msg, e);
+ ActiveMQTopic destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
+ for (PUBLISH msg : retainedMessages.getMessages(destination)) {
+ if( msg.payload().length > 0 ) {
+ try {
+ getMQTTTransport().sendToMQTT(msg.encode());
+ } catch (IOException e) {
+ LOG.warn("Couldn't send retained message " + msg, e);
+ }
}
}
}
@@ -333,7 +333,7 @@ public class MQTTProtocolConverter {
consumerInfo.setDestination(destination);
consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch());
consumerInfo.setDispatchAsync(true);
- if (!connect.cleanSession() && (connect.clientId() != null)) {
+ if ( connect.clientId() != null && topic.qos().ordinal() >= QoS.AT_LEAST_ONCE.ordinal() ) {
consumerInfo.setSubscriptionName(topic.qos()+":"+topic.name().toString());
}
MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
@@ -418,10 +418,10 @@ public class MQTTProtocolConverter {
void onMQTTPublish(PUBLISH command) throws IOException, JMSException {
checkConnected();
+ ActiveMQMessage message = convertMessage(command);
if (command.retain()){
- retainedMessages.addMessage(command.topicName().toString(),command.payload());
+ retainedMessages.addMessage((ActiveMQTopic) message.getDestination(), command);
}
- ActiveMQMessage message = convertMessage(command);
message.setProducerId(producerId);
message.onSend();
sendToActiveMQ(message, createResponseHandler(command));
@@ -484,7 +484,7 @@ public class MQTTProtocolConverter {
synchronized (activeMQTopicMap) {
topic = activeMQTopicMap.get(command.topicName());
if (topic == null) {
- String topicName = command.topicName().toString().replaceAll("/", ".");
+ String topicName = convertMQTTToActiveMQ(command.topicName().toString());
topic = new ActiveMQTopic(topicName);
activeMQTopicMap.put(command.topicName(), topic);
}
@@ -563,17 +563,21 @@ public class MQTTProtocolConverter {
return mqttTransport;
}
+ boolean willSent = false;
public void onTransportError() {
if (connect != null) {
- if (connected.get() && connect.willTopic() != null && connect.willMessage() != null) {
+ if (connected.get() && connect.willTopic() != null && connect.willMessage() != null && !willSent) {
+ willSent = true;
try {
PUBLISH publish = new PUBLISH();
publish.topicName(connect.willTopic());
publish.qos(connect.willQos());
+ publish.messageId((short) messageIdGenerator.getNextSequenceId());
publish.payload(connect.willMessage());
ActiveMQMessage message = convertMessage(publish);
message.setProducerId(producerId);
message.onSend();
+
sendToActiveMQ(message, null);
} catch (Exception e) {
LOG.warn("Failed to publish Will Message " + connect.willMessage());
@@ -703,10 +707,35 @@ public class MQTTProtocolConverter {
}
private String convertMQTTToActiveMQ(String name) {
- String result = name.replace('#', '>');
- result = result.replace('+', '*');
- result = result.replace('/', '.');
- return result;
+ char[] chars = name.toCharArray();
+ for (int i = 0; i < chars.length; i++) {
+ switch(chars[i]) {
+
+ case '#':
+ chars[i] = '>';
+ break;
+ case '>':
+ chars[i] = '#';
+ break;
+
+ case '+':
+ chars[i] = '*';
+ break;
+ case '*':
+ chars[i] = '+';
+ break;
+
+ case '/':
+ chars[i] = '.';
+ break;
+ case '.':
+ chars[i] = '/';
+ break;
+
+ }
+ }
+ String rc = new String(chars);
+ return rc;
}
public long getDefaultKeepAlive() {
http://git-wip-us.apache.org/repos/asf/activemq/blob/97358060/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java
index e502dce..250366d 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java
@@ -18,36 +18,51 @@ package org.apache.activemq.transport.mqtt;
import org.apache.activemq.Service;
import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.util.LRUCache;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.filter.DestinationMapNode;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.UTF8Buffer;
+import org.fusesource.mqtt.codec.PUBLISH;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Set;
+
public class MQTTRetainedMessages extends ServiceSupport {
private static final Logger LOG = LoggerFactory.getLogger(MQTTRetainedMessages.class);
private static final Object LOCK = new Object();
- private LRUCache<String,Buffer> cache = new LRUCache<String, Buffer>(10000);
+
+ DestinationMapNode retainedMessages = new DestinationMapNode(null);
private MQTTRetainedMessages(){
}
@Override
protected void doStop(ServiceStopper stopper) throws Exception {
- cache.clear();
+ synchronized (this) {
+ retainedMessages = new DestinationMapNode(null);
+ }
}
@Override
protected void doStart() throws Exception {
}
- public void addMessage(String destination,Buffer payload){
- cache.put(destination,payload);
+ public void addMessage(ActiveMQTopic dest, PUBLISH publish){
+ synchronized (this) {
+ retainedMessages.set(dest.getDestinationPaths(), 0, publish);
+ }
}
- public Buffer getMessage(String destination){
- return cache.get(destination);
+ public Set<PUBLISH> getMessages(ActiveMQTopic topic){
+ Set answer = new HashSet();
+ synchronized (this) {
+ retainedMessages.appendMatchingValues(answer, topic.getDestinationPaths(), 0);
+ }
+ return (Set<PUBLISH>)answer;
}
public static MQTTRetainedMessages getMQTTRetainedMessages(BrokerService broker){
http://git-wip-us.apache.org/repos/asf/activemq/blob/97358060/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/IDERunner.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/IDERunner.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/IDERunner.java
new file mode 100644
index 0000000..48e34c4
--- /dev/null
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/IDERunner.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+
+import java.io.File;
+
+/**
+ * A little helper class for testing a broker in your IDE.
+ */
+public class IDERunner {
+
+ public static void main(String[]args) throws Exception {
+ BrokerService bs = new BrokerService();
+ bs.addConnector("mqtt://0.0.0.0:1883?trace=true");
+ KahaDBStore store = new KahaDBStore();
+ store.setDirectory(new File("target/activemq-data/kahadb"));
+ bs.setPersistenceAdapter(store);
+ bs.deleteAllMessages();
+ bs.start();
+ bs.waitUntilStopped();
+ }
+}