You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2014/05/26 11:07:37 UTC

[10/13] git commit: Fixed AMQ-5160, remove durable subscription in onUnsubscribe()

Fixed AMQ-5160, remove durable subscription in onUnsubscribe()


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/88c6ee97
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/88c6ee97
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/88c6ee97

Branch: refs/heads/trunk
Commit: 88c6ee97e0fc8652f2dba16786cb8cbd7b80a1b7
Parents: 8947a09
Author: Dhiraj Bokde <dh...@yahoo.com>
Authored: Tue May 13 00:29:40 2014 -0700
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Mon May 26 11:07:19 2014 +0200

----------------------------------------------------------------------
 .../transport/mqtt/MQTTProtocolConverter.java   | 50 ++++++++++++--------
 1 file changed, 31 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/88c6ee97/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index cbb6415..71a6fcf 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -361,12 +361,13 @@ public class MQTTProtocolConverter {
         ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topicName.toString()));
 
         if( mqttSubscriptionByTopic.containsKey(topicName)) {
-            if (topicQoS != mqttSubscriptionByTopic.get(topicName).qos()) {
+            final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(topicName);
+            if (topicQoS != mqttSubscription.qos()) {
                 // remove old subscription as the QoS has changed
                 onUnSubscribe(topicName);
             } else {
                 // duplicate SUBSCRIBE packet, find all matching topics and resend retained messages
-                resendRetainedMessages(topicName, destination);
+                resendRetainedMessages(topicName, destination, mqttSubscription);
 
                 return (byte) topicQoS.ordinal();
             }
@@ -408,7 +409,8 @@ public class MQTTProtocolConverter {
         return qos[0];
     }
 
-    private void resendRetainedMessages(UTF8Buffer topicName, ActiveMQDestination destination) throws MQTTProtocolException {
+    private void resendRetainedMessages(UTF8Buffer topicName, ActiveMQDestination destination,
+                                        MQTTSubscription mqttSubscription) throws MQTTProtocolException {
         // get TopicRegion
         RegionBroker regionBroker;
         try {
@@ -418,25 +420,26 @@ public class MQTTProtocolConverter {
         }
         final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
 
+        final ConsumerInfo consumerInfo = mqttSubscription.getConsumerInfo();
+        final ConsumerId consumerId = consumerInfo.getConsumerId();
+
+        // use actual client id used to create connection to lookup connection context
+        final String connectionInfoClientId = connectionInfo.getClientId();
+        final ConnectionContext connectionContext = regionBroker.getConnectionContext(connectionInfoClientId);
+
         // get all matching Topics
         final Set<org.apache.activemq.broker.region.Destination> matchingDestinations = topicRegion.getDestinations(destination);
         for (org.apache.activemq.broker.region.Destination dest : matchingDestinations) {
-            // find matching MQTT subscription for this client
-            final String mqttTopicName = convertActiveMQToMQTT(dest.getName());
-            final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(new UTF8Buffer(mqttTopicName));
-            if (mqttSubscription != null) {
-                // recover retroactive messages for matching subscription
-                final ConsumerInfo consumerInfo = mqttSubscription.getConsumerInfo();
-                final ConsumerId consumerId = consumerInfo.getConsumerId();
-                final Subscription subscription = topicRegion.getSubscriptions().get(consumerId);
-
-                // use actual client id used to create connection to lookup connection context
-                final ConnectionContext connectionContext = regionBroker.getConnectionContext(connectionInfo.getClientId());
-                try {
-                    ((org.apache.activemq.broker.region.Topic)dest).recoverRetroactiveMessages(connectionContext, subscription);
-                } catch (Exception e) {
-                    throw new MQTTProtocolException("Error recovering retained messages for " +
-                        mqttTopicName + ": " + e.getMessage(), false, e);
+
+            // recover retroactive messages for matching subscriptions
+            for (Subscription subscription : dest.getConsumers()) {
+                if (subscription.getConsumerInfo().getConsumerId().equals(consumerId)) {
+                    try {
+                        ((org.apache.activemq.broker.region.Topic)dest).recoverRetroactiveMessages(connectionContext, subscription);
+                    } catch (Exception e) {
+                        throw new MQTTProtocolException("Error recovering retained messages for " +
+                            dest.getName() + ": " + e.getMessage(), false, e);
+                    }
                 }
             }
         }
@@ -467,6 +470,15 @@ public class MQTTProtocolConverter {
                 removeInfo = info.createRemoveCommand();
             }
             sendToActiveMQ(removeInfo, null);
+
+            // check if the durable sub also needs to be removed
+            if (subs.getConsumerInfo().getSubscriptionName() != null) {
+                RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
+                rsi.setConnectionId(connectionId);
+                rsi.setSubscriptionName(subs.getConsumerInfo().getSubscriptionName());
+                rsi.setClientId(connectionInfo.getClientId());
+                sendToActiveMQ(rsi, null);
+            }
         }
     }