You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2014/05/26 11:07:37 UTC
[10/13] git commit: Fixed AMQ-5160,
remove durable subscription in onUnsubscribe()
Fixed AMQ-5160, remove durable subscription in onUnsubscribe()
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/88c6ee97
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/88c6ee97
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/88c6ee97
Branch: refs/heads/trunk
Commit: 88c6ee97e0fc8652f2dba16786cb8cbd7b80a1b7
Parents: 8947a09
Author: Dhiraj Bokde <dh...@yahoo.com>
Authored: Tue May 13 00:29:40 2014 -0700
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Mon May 26 11:07:19 2014 +0200
----------------------------------------------------------------------
.../transport/mqtt/MQTTProtocolConverter.java | 50 ++++++++++++--------
1 file changed, 31 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/88c6ee97/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index cbb6415..71a6fcf 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -361,12 +361,13 @@ public class MQTTProtocolConverter {
ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topicName.toString()));
if( mqttSubscriptionByTopic.containsKey(topicName)) {
- if (topicQoS != mqttSubscriptionByTopic.get(topicName).qos()) {
+ final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(topicName);
+ if (topicQoS != mqttSubscription.qos()) {
// remove old subscription as the QoS has changed
onUnSubscribe(topicName);
} else {
// duplicate SUBSCRIBE packet, find all matching topics and resend retained messages
- resendRetainedMessages(topicName, destination);
+ resendRetainedMessages(topicName, destination, mqttSubscription);
return (byte) topicQoS.ordinal();
}
@@ -408,7 +409,8 @@ public class MQTTProtocolConverter {
return qos[0];
}
- private void resendRetainedMessages(UTF8Buffer topicName, ActiveMQDestination destination) throws MQTTProtocolException {
+ private void resendRetainedMessages(UTF8Buffer topicName, ActiveMQDestination destination,
+ MQTTSubscription mqttSubscription) throws MQTTProtocolException {
// get TopicRegion
RegionBroker regionBroker;
try {
@@ -418,25 +420,26 @@ public class MQTTProtocolConverter {
}
final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
+ final ConsumerInfo consumerInfo = mqttSubscription.getConsumerInfo();
+ final ConsumerId consumerId = consumerInfo.getConsumerId();
+
+ // use actual client id used to create connection to lookup connection context
+ final String connectionInfoClientId = connectionInfo.getClientId();
+ final ConnectionContext connectionContext = regionBroker.getConnectionContext(connectionInfoClientId);
+
// get all matching Topics
final Set<org.apache.activemq.broker.region.Destination> matchingDestinations = topicRegion.getDestinations(destination);
for (org.apache.activemq.broker.region.Destination dest : matchingDestinations) {
- // find matching MQTT subscription for this client
- final String mqttTopicName = convertActiveMQToMQTT(dest.getName());
- final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(new UTF8Buffer(mqttTopicName));
- if (mqttSubscription != null) {
- // recover retroactive messages for matching subscription
- final ConsumerInfo consumerInfo = mqttSubscription.getConsumerInfo();
- final ConsumerId consumerId = consumerInfo.getConsumerId();
- final Subscription subscription = topicRegion.getSubscriptions().get(consumerId);
-
- // use actual client id used to create connection to lookup connection context
- final ConnectionContext connectionContext = regionBroker.getConnectionContext(connectionInfo.getClientId());
- try {
- ((org.apache.activemq.broker.region.Topic)dest).recoverRetroactiveMessages(connectionContext, subscription);
- } catch (Exception e) {
- throw new MQTTProtocolException("Error recovering retained messages for " +
- mqttTopicName + ": " + e.getMessage(), false, e);
+
+ // recover retroactive messages for matching subscriptions
+ for (Subscription subscription : dest.getConsumers()) {
+ if (subscription.getConsumerInfo().getConsumerId().equals(consumerId)) {
+ try {
+ ((org.apache.activemq.broker.region.Topic)dest).recoverRetroactiveMessages(connectionContext, subscription);
+ } catch (Exception e) {
+ throw new MQTTProtocolException("Error recovering retained messages for " +
+ dest.getName() + ": " + e.getMessage(), false, e);
+ }
}
}
}
@@ -467,6 +470,15 @@ public class MQTTProtocolConverter {
removeInfo = info.createRemoveCommand();
}
sendToActiveMQ(removeInfo, null);
+
+ // check if the durable sub also needs to be removed
+ if (subs.getConsumerInfo().getSubscriptionName() != null) {
+ RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
+ rsi.setConnectionId(connectionId);
+ rsi.setSubscriptionName(subs.getConsumerInfo().getSubscriptionName());
+ rsi.setClientId(connectionInfo.getClientId());
+ sendToActiveMQ(rsi, null);
+ }
}
}