You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2014/02/12 16:59:38 UTC
git commit: More improvements for AMQ-5043.
Updated Branches:
refs/heads/trunk 084d606d8 -> e2a7d6af5
More improvements for AMQ-5043.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e2a7d6af
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e2a7d6af
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e2a7d6af
Branch: refs/heads/trunk
Commit: e2a7d6af5a447330e3e180e681c9964332c36558
Parents: 084d606
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Wed Feb 12 10:59:31 2014 -0500
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Wed Feb 12 10:59:40 2014 -0500
----------------------------------------------------------------------
.../activemq/filter/DestinationMapNode.java | 2 +-
.../transport/mqtt/MQTTProtocolConverter.java | 17 ++++++---
.../transport/mqtt/MQTTTransportFilter.java | 37 +++++++++++++++++---
3 files changed, 45 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/e2a7d6af/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java
index d52f9de..a2360a0 100755
--- a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java
+++ b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java
@@ -138,7 +138,7 @@ public class DestinationMapNode implements DestinationNode {
values.clear();
values.add(value);
} else {
- getChildOrCreate(paths[idx]).add(paths, idx + 1, value);
+ getChildOrCreate(paths[idx]).set(paths, idx + 1, value);
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/e2a7d6af/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 48c18ce..f7c3c1e 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -95,7 +95,6 @@ public class MQTTProtocolConverter {
}
void sendToActiveMQ(Command command, ResponseHandler handler) {
- System.out.println(mqttTransport.getInactivityMonitor()+" ==> "+command);
command.setCommandId(generateCommandId());
if (handler != null) {
command.setResponseRequired(true);
@@ -256,10 +255,18 @@ public class MQTTProtocolConverter {
public void deleteDurableSubs(List<SubscriptionInfo> subs) {
try {
for (SubscriptionInfo sub : subs) {
- TopicMessageStore store = brokerService.getPersistenceAdapter().createTopicMessageStore((ActiveMQTopic) sub.getDestination());
- store.deleteSubscription(connectionInfo.getClientId(), sub.getSubscriptionName());
+ RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
+ rsi.setConnectionId(connectionId);
+ rsi.setSubscriptionName(sub.getSubcriptionName());
+ rsi.setClientId(sub.getClientId());
+ sendToActiveMQ(rsi, new ResponseHandler() {
+ @Override
+ public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
+ // ignore failures..
+ }
+ });
}
- } catch (IOException e) {
+ } catch (Throwable e) {
LOG.warn("Could not delete the MQTT durable subs.", e);
}
}
@@ -477,7 +484,7 @@ public class MQTTProtocolConverter {
msg.setMessageId(id);
msg.setTimestamp(System.currentTimeMillis());
msg.setPriority((byte) Message.DEFAULT_PRIORITY);
- msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE);
+ msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE && !command.retain());
msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal());
ActiveMQTopic topic;
http://git-wip-us.apache.org/repos/asf/activemq/blob/e2a7d6af/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
index 1dcf3dc..54f40e7 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
@@ -17,6 +17,7 @@
package org.apache.activemq.transport.mqtt;
import java.io.IOException;
+import java.net.ProtocolException;
import java.security.cert.X509Certificate;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -31,7 +32,7 @@ import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.tcp.SslTransport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
-import org.fusesource.mqtt.codec.MQTTFrame;
+import org.fusesource.mqtt.codec.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,11 +74,11 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
@Override
public void onCommand(Object command) {
try {
+ MQTTFrame frame = (MQTTFrame) command;
if (trace) {
- TRACE.trace("Received: \n" + command);
+ TRACE.trace("Received: " + toString(frame));
}
-
- protocolConverter.onMQTTCommand((MQTTFrame) command);
+ protocolConverter.onMQTTCommand(frame);
} catch (IOException e) {
handleException(e);
} catch (JMSException e) {
@@ -97,7 +98,7 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
public void sendToMQTT(MQTTFrame command) throws IOException {
if( !stopped.get() ) {
if (trace) {
- TRACE.trace("Sending: \n" + command);
+ TRACE.trace("Sending : " + toString(command));
}
Transport n = next;
if (n != null) {
@@ -106,6 +107,32 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
}
}
+ static private String toString(MQTTFrame frame) {
+ if( frame == null )
+ return null;
+ try {
+ switch (frame.messageType()) {
+ case PINGREQ.TYPE: return new PINGREQ().decode(frame).toString();
+ case PINGRESP.TYPE: return new PINGRESP().decode(frame).toString();
+ case CONNECT.TYPE: return new CONNECT().decode(frame).toString();
+ case DISCONNECT.TYPE: return new DISCONNECT().decode(frame).toString();
+ case SUBSCRIBE.TYPE: return new SUBSCRIBE().decode(frame).toString();
+ case UNSUBSCRIBE.TYPE: return new UNSUBSCRIBE().decode(frame).toString();
+ case PUBLISH.TYPE: return new PUBLISH().decode(frame).toString();
+ case PUBACK.TYPE: return new PUBACK().decode(frame).toString();
+ case PUBREC.TYPE: return new PUBREC().decode(frame).toString();
+ case PUBREL.TYPE: return new PUBREL().decode(frame).toString();
+ case PUBCOMP.TYPE: return new PUBCOMP().decode(frame).toString();
+ case CONNACK.TYPE: return new CONNACK().decode(frame).toString();
+ case SUBACK.TYPE: return new SUBACK().decode(frame).toString();
+ default: return frame.toString();
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ return frame.toString();
+ }
+ }
+
@Override
public void stop() throws Exception {
if( stopped.compareAndSet(false, true) ) {