You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2014/05/26 11:07:32 UTC

[05/13] git commit: Fixed AMQ-5160, fixed race condition for retained messages

Fixed AMQ-5160, fixed race condition for retained messages


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/86440903
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/86440903
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/86440903

Branch: refs/heads/trunk
Commit: 8644090377eeef09a5afb2e46594c4fa4c311aae
Parents: c915b19
Author: Dhiraj Bokde <dh...@yahoo.com>
Authored: Tue May 13 12:16:44 2014 -0700
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Mon May 26 11:07:19 2014 +0200

----------------------------------------------------------------------
 .../transport/mqtt/MQTTProtocolConverter.java       | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/86440903/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 71a6fcf..88e684e 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -386,6 +386,10 @@ public class MQTTProtocolConverter {
         }
         MQTTSubscription mqttSubscription = new MQTTSubscription(this, topicQoS, consumerInfo);
 
+        // optimistic add to local maps first to be able to handle commands in onActiveMQCommand
+        subscriptionsByConsumerId.put(id, mqttSubscription);
+        mqttSubscriptionByTopic.put(topicName, mqttSubscription);
+
         final byte[] qos = {-1};
         sendToActiveMQ(consumerInfo, new ResponseHandler() {
             @Override
@@ -401,9 +405,10 @@ public class MQTTProtocolConverter {
             }
         });
 
-        if (qos[0] != SUBSCRIBE_ERROR) {
-            subscriptionsByConsumerId.put(id, mqttSubscription);
-            mqttSubscriptionByTopic.put(topicName, mqttSubscription);
+        if (qos[0] == SUBSCRIBE_ERROR) {
+            // remove from local maps if subscribe failed
+            subscriptionsByConsumerId.remove(id);
+            mqttSubscriptionByTopic.remove(topicName);
         }
 
         return qos[0];
@@ -431,7 +436,7 @@ public class MQTTProtocolConverter {
         final Set<org.apache.activemq.broker.region.Destination> matchingDestinations = topicRegion.getDestinations(destination);
         for (org.apache.activemq.broker.region.Destination dest : matchingDestinations) {
 
-            // recover retroactive messages for matching subscriptions
+            // recover retroactive messages for matching subscription
             for (Subscription subscription : dest.getConsumers()) {
                 if (subscription.getConsumerInfo().getConsumerId().equals(consumerId)) {
                     try {
@@ -440,6 +445,7 @@ public class MQTTProtocolConverter {
                         throw new MQTTProtocolException("Error recovering retained messages for " +
                             dest.getName() + ": " + e.getMessage(), false, e);
                     }
+                    break;
                 }
             }
         }
@@ -483,7 +489,7 @@ public class MQTTProtocolConverter {
     }
 
     /**
-     * Dispatch a ActiveMQ command
+     * Dispatch an ActiveMQ command
      */
     public void onActiveMQCommand(Command command) throws Exception {
         if (command.isResponse()) {