You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2014/02/12 16:59:38 UTC

git commit: More improvements for AMQ-5043.

Updated Branches:
  refs/heads/trunk 084d606d8 -> e2a7d6af5


More improvements for AMQ-5043.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e2a7d6af
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e2a7d6af
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e2a7d6af

Branch: refs/heads/trunk
Commit: e2a7d6af5a447330e3e180e681c9964332c36558
Parents: 084d606
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Wed Feb 12 10:59:31 2014 -0500
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Wed Feb 12 10:59:40 2014 -0500

----------------------------------------------------------------------
 .../activemq/filter/DestinationMapNode.java     |  2 +-
 .../transport/mqtt/MQTTProtocolConverter.java   | 17 ++++++---
 .../transport/mqtt/MQTTTransportFilter.java     | 37 +++++++++++++++++---
 3 files changed, 45 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/e2a7d6af/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java
index d52f9de..a2360a0 100755
--- a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java
+++ b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java
@@ -138,7 +138,7 @@ public class DestinationMapNode implements DestinationNode {
             values.clear();
             values.add(value);
         } else {
-            getChildOrCreate(paths[idx]).add(paths, idx + 1, value);
+            getChildOrCreate(paths[idx]).set(paths, idx + 1, value);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/e2a7d6af/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 48c18ce..f7c3c1e 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -95,7 +95,6 @@ public class MQTTProtocolConverter {
     }
 
     void sendToActiveMQ(Command command, ResponseHandler handler) {
-        System.out.println(mqttTransport.getInactivityMonitor()+" ==> "+command);
         command.setCommandId(generateCommandId());
         if (handler != null) {
             command.setResponseRequired(true);
@@ -256,10 +255,18 @@ public class MQTTProtocolConverter {
     public void deleteDurableSubs(List<SubscriptionInfo> subs) {
         try {
             for (SubscriptionInfo sub : subs) {
-                TopicMessageStore store = brokerService.getPersistenceAdapter().createTopicMessageStore((ActiveMQTopic) sub.getDestination());
-                store.deleteSubscription(connectionInfo.getClientId(), sub.getSubscriptionName());
+                RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
+                rsi.setConnectionId(connectionId);
+                rsi.setSubscriptionName(sub.getSubcriptionName());
+                rsi.setClientId(sub.getClientId());
+                sendToActiveMQ(rsi, new ResponseHandler() {
+                    @Override
+                    public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
+                        // ignore failures..
+                    }
+                });
             }
-        } catch (IOException e) {
+        } catch (Throwable e) {
             LOG.warn("Could not delete the MQTT durable subs.", e);
         }
     }
@@ -477,7 +484,7 @@ public class MQTTProtocolConverter {
         msg.setMessageId(id);
         msg.setTimestamp(System.currentTimeMillis());
         msg.setPriority((byte) Message.DEFAULT_PRIORITY);
-        msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE);
+        msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE && !command.retain());
         msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal());
 
         ActiveMQTopic topic;

http://git-wip-us.apache.org/repos/asf/activemq/blob/e2a7d6af/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
index 1dcf3dc..54f40e7 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport.mqtt;
 
 import java.io.IOException;
+import java.net.ProtocolException;
 import java.security.cert.X509Certificate;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -31,7 +32,7 @@ import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.transport.tcp.SslTransport;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.wireformat.WireFormat;
-import org.fusesource.mqtt.codec.MQTTFrame;
+import org.fusesource.mqtt.codec.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,11 +74,11 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
     @Override
     public void onCommand(Object command) {
         try {
+            MQTTFrame frame = (MQTTFrame) command;
             if (trace) {
-                TRACE.trace("Received: \n" + command);
+                TRACE.trace("Received: " + toString(frame));
             }
-
-            protocolConverter.onMQTTCommand((MQTTFrame) command);
+            protocolConverter.onMQTTCommand(frame);
         } catch (IOException e) {
             handleException(e);
         } catch (JMSException e) {
@@ -97,7 +98,7 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
     public void sendToMQTT(MQTTFrame command) throws IOException {
         if( !stopped.get() ) {
             if (trace) {
-                TRACE.trace("Sending: \n" + command);
+                TRACE.trace("Sending : " + toString(command));
             }
             Transport n = next;
             if (n != null) {
@@ -106,6 +107,32 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
         }
     }
 
+    static private String toString(MQTTFrame frame) {
+        if( frame == null )
+            return null;
+        try {
+            switch (frame.messageType()) {
+                case PINGREQ.TYPE: return new PINGREQ().decode(frame).toString();
+                case PINGRESP.TYPE: return new PINGRESP().decode(frame).toString();
+                case CONNECT.TYPE: return new CONNECT().decode(frame).toString();
+                case DISCONNECT.TYPE: return new DISCONNECT().decode(frame).toString();
+                case SUBSCRIBE.TYPE: return new SUBSCRIBE().decode(frame).toString();
+                case UNSUBSCRIBE.TYPE: return new UNSUBSCRIBE().decode(frame).toString();
+                case PUBLISH.TYPE: return new PUBLISH().decode(frame).toString();
+                case PUBACK.TYPE: return new PUBACK().decode(frame).toString();
+                case PUBREC.TYPE: return new PUBREC().decode(frame).toString();
+                case PUBREL.TYPE: return new PUBREL().decode(frame).toString();
+                case PUBCOMP.TYPE: return new PUBCOMP().decode(frame).toString();
+                case CONNACK.TYPE: return new CONNACK().decode(frame).toString();
+                case SUBACK.TYPE: return new SUBACK().decode(frame).toString();
+                default: return frame.toString();
+            }
+        } catch (Throwable e) {
+            e.printStackTrace();
+            return frame.toString();
+        }
+    }
+
     @Override
     public void stop() throws Exception {
         if( stopped.compareAndSet(false, true) ) {