You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2014/05/26 11:07:32 UTC
[05/13] git commit: Fixed AMQ-5160,
fixed race condition for retained messages
Fixed AMQ-5160, fixed race condition for retained messages
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/86440903
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/86440903
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/86440903
Branch: refs/heads/trunk
Commit: 8644090377eeef09a5afb2e46594c4fa4c311aae
Parents: c915b19
Author: Dhiraj Bokde <dh...@yahoo.com>
Authored: Tue May 13 12:16:44 2014 -0700
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Mon May 26 11:07:19 2014 +0200
----------------------------------------------------------------------
.../transport/mqtt/MQTTProtocolConverter.java | 16 +++++++++++-----
1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/86440903/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 71a6fcf..88e684e 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -386,6 +386,10 @@ public class MQTTProtocolConverter {
}
MQTTSubscription mqttSubscription = new MQTTSubscription(this, topicQoS, consumerInfo);
+ // optimistic add to local maps first to be able to handle commands in onActiveMQCommand
+ subscriptionsByConsumerId.put(id, mqttSubscription);
+ mqttSubscriptionByTopic.put(topicName, mqttSubscription);
+
final byte[] qos = {-1};
sendToActiveMQ(consumerInfo, new ResponseHandler() {
@Override
@@ -401,9 +405,10 @@ public class MQTTProtocolConverter {
}
});
- if (qos[0] != SUBSCRIBE_ERROR) {
- subscriptionsByConsumerId.put(id, mqttSubscription);
- mqttSubscriptionByTopic.put(topicName, mqttSubscription);
+ if (qos[0] == SUBSCRIBE_ERROR) {
+ // remove from local maps if subscribe failed
+ subscriptionsByConsumerId.remove(id);
+ mqttSubscriptionByTopic.remove(topicName);
}
return qos[0];
@@ -431,7 +436,7 @@ public class MQTTProtocolConverter {
final Set<org.apache.activemq.broker.region.Destination> matchingDestinations = topicRegion.getDestinations(destination);
for (org.apache.activemq.broker.region.Destination dest : matchingDestinations) {
- // recover retroactive messages for matching subscriptions
+ // recover retroactive messages for matching subscription
for (Subscription subscription : dest.getConsumers()) {
if (subscription.getConsumerInfo().getConsumerId().equals(consumerId)) {
try {
@@ -440,6 +445,7 @@ public class MQTTProtocolConverter {
throw new MQTTProtocolException("Error recovering retained messages for " +
dest.getName() + ": " + e.getMessage(), false, e);
}
+ break;
}
}
}
@@ -483,7 +489,7 @@ public class MQTTProtocolConverter {
}
/**
- * Dispatch a ActiveMQ command
+ * Dispatch an ActiveMQ command
*/
public void onActiveMQCommand(Command command) throws Exception {
if (command.isResponse()) {