You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2014/05/26 11:07:39 UTC

[12/13] git commit: Fixed AMQ-5160, fixed durable subscription retroactive recovery

Fixed AMQ-5160, fixed durable subscription retroactive recovery


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6c859676
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6c859676
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6c859676

Branch: refs/heads/trunk
Commit: 6c859676b3995334e96c16c47653ec72fa70f729
Parents: 42ad103
Author: Dhiraj Bokde <dh...@yahoo.com>
Authored: Fri May 16 14:21:19 2014 -0700
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Mon May 26 11:07:19 2014 +0200

----------------------------------------------------------------------
 .../broker/region/DurableTopicSubscription.java | 18 ++++-------
 .../broker/region/PrefetchSubscription.java     |  3 +-
 .../transport/mqtt/MQTTProtocolConverter.java   | 21 ++++++++++++
 .../activemq/transport/mqtt/MQTTTest.java       | 34 ++++++++++++++------
 4 files changed, 55 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/6c859676/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
index e61a608..4c19c62 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 
@@ -120,9 +119,6 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
         if (active.get() || keepDurableSubsActive) {
             Topic topic = (Topic) destination;
             topic.activate(context, this);
-            if (topic.isAlwaysRetroactive() || info.isRetroactive()) {
-                topic.recoverRetroactiveMessages(context, this);
-            }
             this.enqueueCounter += pending.size();
         } else if (destination.getMessageStore() != null) {
             TopicMessageStore store = (TopicMessageStore) destination.getMessageStore();
@@ -172,12 +168,12 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
                     pending.setMaxAuditDepth(getMaxAuditDepth());
                     pending.setMaxProducersToAudit(getMaxProducersToAudit());
                     pending.start();
-                    // use recovery policy for retroactive topics and consumers
-                    for (Destination destination : durableDestinations.values()) {
-                        Topic topic = (Topic) destination;
-                        if (topic.isAlwaysRetroactive() || info.isRetroactive()) {
-                            topic.recoverRetroactiveMessages(context, this);
-                        }
+                }
+                // use recovery policy every time sub is activated for retroactive topics and consumers
+                for (Destination destination : durableDestinations.values()) {
+                    Topic topic = (Topic) destination;
+                    if (topic.isAlwaysRetroactive() || info.isRetroactive()) {
+                        topic.recoverRetroactiveMessages(context, this);
                     }
                 }
             }
@@ -277,7 +273,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
     }
 
     @Override
-    protected void dispatchPending() throws IOException {
+    public void dispatchPending() throws IOException {
         if (isActive()) {
             super.dispatchPending();
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/6c859676/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index ff4c0aa..5ba3b53 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -633,7 +633,8 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
         dispatched.removeAll(references);
     }
 
-    protected void dispatchPending() throws IOException {
+    // made public so it can be used in MQTTProtocolConverter
+    public void dispatchPending() throws IOException {
        synchronized(pendingLock) {
             try {
                 int numberToDispatch = countBeforeFull();

http://git-wip-us.apache.org/repos/asf/activemq/blob/6c859676/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 88e684e..56f7fbd 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -17,6 +17,8 @@
 package org.apache.activemq.transport.mqtt;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -30,6 +32,7 @@ import javax.jms.Message;
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.PrefetchSubscription;
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TopicRegion;
@@ -71,6 +74,8 @@ public class MQTTProtocolConverter {
     private final ConcurrentHashMap<UTF8Buffer, MQTTSubscription> mqttSubscriptionByTopic = new ConcurrentHashMap<UTF8Buffer, MQTTSubscription>();
     private final Map<UTF8Buffer, ActiveMQTopic> activeMQTopicMap = new LRUCache<UTF8Buffer, ActiveMQTopic>(DEFAULT_CACHE_SIZE);
     private final Map<Destination, UTF8Buffer> mqttTopicMap = new LRUCache<Destination, UTF8Buffer>(DEFAULT_CACHE_SIZE);
+    private final Set<String> restoredSubs = Collections.synchronizedSet(new HashSet<String>());
+
     private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE);
     private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE);
 
@@ -317,6 +322,8 @@ public class MQTTProtocolConverter {
                 String[] split = name.split(":", 2);
                 QoS qoS = QoS.valueOf(split[0]);
                 onSubscribe(new Topic(split[1], qoS));
+                // mark this durable subscription as restored by Broker
+                restoredSubs.add(split[1]);
             }
         } catch (IOException e) {
             LOG.warn("Could not restore the MQTT durable subs.", e);
@@ -416,6 +423,12 @@ public class MQTTProtocolConverter {
 
     private void resendRetainedMessages(UTF8Buffer topicName, ActiveMQDestination destination,
                                         MQTTSubscription mqttSubscription) throws MQTTProtocolException {
+        // check whether the Topic has been recovered in restoreDurableSubs
+        // mark subscription available for recovery for duplicate subscription
+        if (restoredSubs.remove(destination.getPhysicalName())) {
+            return;
+        }
+
         // get TopicRegion
         RegionBroker regionBroker;
         try {
@@ -441,6 +454,11 @@ public class MQTTProtocolConverter {
                 if (subscription.getConsumerInfo().getConsumerId().equals(consumerId)) {
                     try {
                         ((org.apache.activemq.broker.region.Topic)dest).recoverRetroactiveMessages(connectionContext, subscription);
+                        if (subscription instanceof PrefetchSubscription) {
+                            // request dispatch for prefetch subs
+                            PrefetchSubscription prefetchSubscription = (PrefetchSubscription) subscription;
+                            prefetchSubscription.dispatchPending();
+                        }
                     } catch (Exception e) {
                         throw new MQTTProtocolException("Error recovering retained messages for " +
                             dest.getName() + ": " + e.getMessage(), false, e);
@@ -479,6 +497,9 @@ public class MQTTProtocolConverter {
 
             // check if the durable sub also needs to be removed
             if (subs.getConsumerInfo().getSubscriptionName() != null) {
+                // also remove it from restored durable subscriptions set
+                restoredSubs.remove(convertMQTTToActiveMQ(topicName.toString()));
+
                 RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
                 rsi.setConnectionId(connectionId);
                 rsi.setSubscriptionName(subs.getConsumerInfo().getSubscriptionName());

http://git-wip-us.apache.org/repos/asf/activemq/blob/6c859676/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 9c8c9b5..466e6a6 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -526,55 +526,71 @@ public class MQTTTest extends AbstractMQTTTest {
 
     }
 
-    @Test(timeout = 60 * 1000)
+    @Test(timeout = 120 * 1000)
     public void testRetainedMessage() throws Exception {
         addMQTTConnector();
         brokerService.start();
 
         MQTT mqtt = createMQTTConnection();
         mqtt.setKeepAlive((short) 2);
-        mqtt.setCleanSession(true);
 
         final String RETAIN = "RETAIN";
         final String TOPICA = "TopicA";
 
-        final String[] clientIds = { null, "foo" };
+        final String[] clientIds = { null, "foo", "durable" };
         for (String clientId : clientIds) {
 
             mqtt.setClientId(clientId);
-            final BlockingConnection connection = mqtt.blockingConnection();
+            mqtt.setCleanSession(!"durable".equals(clientId));
+
+            BlockingConnection connection = mqtt.blockingConnection();
             connection.connect();
 
             // set retained message and check
             connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
-            connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_MOST_ONCE)});
+            connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
             Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
             assertNotNull("No retained message for " + clientId, msg);
             assertEquals(RETAIN, new String(msg.getPayload()));
             msg.ack();
+            assertNull(connection.receive(5000, TimeUnit.MILLISECONDS));
 
             // test duplicate subscription
-            connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_MOST_ONCE)});
-            msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+            connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+            msg = connection.receive(15000, TimeUnit.MILLISECONDS);
             assertNotNull("No retained message on duplicate subscription for " + clientId, msg);
             assertEquals(RETAIN, new String(msg.getPayload()));
             msg.ack();
+            assertNull(connection.receive(5000, TimeUnit.MILLISECONDS));
             connection.unsubscribe(new String[]{"TopicA"});
 
             // clear retained message and check that we don't receive it
             connection.publish(TOPICA, "".getBytes(), QoS.AT_MOST_ONCE, true);
-            connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_MOST_ONCE)});
+            connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
             msg = connection.receive(5000, TimeUnit.MILLISECONDS);
             assertNull("Retained message not cleared for " + clientId, msg);
             connection.unsubscribe(new String[]{"TopicA"});
 
             // set retained message again and check
             connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
-            connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_MOST_ONCE)});
+            connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
             msg = connection.receive(5000, TimeUnit.MILLISECONDS);
             assertNotNull("No reset retained message for " + clientId, msg);
             assertEquals(RETAIN, new String(msg.getPayload()));
             msg.ack();
+            assertNull(connection.receive(5000, TimeUnit.MILLISECONDS));
+
+            // re-connect and check
+            connection.disconnect();
+            connection = mqtt.blockingConnection();
+            connection.connect();
+            connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+            msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+            assertNotNull("No reset retained message for " + clientId, msg);
+            assertEquals(RETAIN, new String(msg.getPayload()));
+            msg.ack();
+            assertNull(connection.receive(5000, TimeUnit.MILLISECONDS));
+
             connection.unsubscribe(new String[]{"TopicA"});
 
             connection.disconnect();