You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/08/09 19:09:49 UTC

svn commit: r1155437 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/jmx/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ test/java/org/apache/activemq/usecases/

Author: gtully
Date: Tue Aug  9 17:09:49 2011
New Revision: 1155437

URL: http://svn.apache.org/viewvc?rev=1155437&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3442: Use real durable sub key as MBean name for inactive durable sub and improve metrics, fix inflight count on deactivate. resolved.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=1155437&r1=1155436&r2=1155437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Tue Aug  9 17:09:49 2011
@@ -392,7 +392,7 @@ public class DestinationView implements 
         int index = 0;
         for (Subscription subscription : subscriptions) {
             String connectionClientId = subscription.getContext().getClientId();
-            String objectNameStr = ManagedRegionBroker.getSubscriptionObjectName(subscription, connectionClientId, objectName);
+            String objectNameStr = ManagedRegionBroker.getSubscriptionObjectName(subscription.getConsumerInfo(), connectionClientId, objectName);
             answer[index++] = new ObjectName(objectNameStr);
         }
         return answer;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java?rev=1155437&r1=1155436&r2=1155437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java Tue Aug  9 17:09:49 2011
@@ -21,6 +21,8 @@ import javax.management.openmbean.OpenDa
 import javax.management.openmbean.TabularData;
 
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.SubscriptionInfo;
 
@@ -36,12 +38,12 @@ public class InactiveDurableSubscription
      * 
      * @param broker
      * @param clientId
-     * @param sub
+     * @param subInfo
      */
-    public InactiveDurableSubscriptionView(ManagedRegionBroker broker, String clientId, SubscriptionInfo sub) {
-        super(broker,clientId, null);
+    public InactiveDurableSubscriptionView(ManagedRegionBroker broker, String clientId, SubscriptionInfo subInfo, Subscription subscription) {
+        super(broker,clientId, subscription);
         this.broker = broker;
-        this.subscriptionInfo = sub;
+        this.subscriptionInfo = subInfo;
     }
 
     /**
@@ -94,6 +96,12 @@ public class InactiveDurableSubscription
         return false;
     }
 
+    @Override
+    protected ConsumerInfo getConsumerInfo() {
+        // when inactive, consumer info is stale
+        return null;
+    }
+
     /**
      * Browse messages for this durable subscriber
      * 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=1155437&r1=1155436&r2=1155437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Tue Aug  9 17:09:49 2011
@@ -185,7 +185,7 @@ public class ManagedRegionBroker extends
     public ObjectName registerSubscription(ConnectionContext context, Subscription sub) {
         String connectionClientId = context.getClientId();
         ObjectName brokerJmxObjectName = brokerObjectName;
-        String objectNameStr = getSubscriptionObjectName(sub, connectionClientId, brokerJmxObjectName);
+        String objectNameStr = getSubscriptionObjectName(sub.getConsumerInfo(), connectionClientId, brokerJmxObjectName);
         SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName());
         try {
             ObjectName objectName = new ObjectName(objectNameStr);
@@ -196,7 +196,7 @@ public class ManagedRegionBroker extends
                 info.setClientId(context.getClientId());
                 info.setSubscriptionName(sub.getConsumerInfo().getSubscriptionName());
                 info.setDestination(sub.getConsumerInfo().getDestination());
-                addInactiveSubscription(key, info);
+                addInactiveSubscription(key, info, sub);
             } else {
                 if (sub.getConsumerInfo().isDurable()) {
                     view = new DurableSubscriptionView(this, context.getClientId(), sub);
@@ -217,21 +217,21 @@ public class ManagedRegionBroker extends
         }
     }
 
-    public static String getSubscriptionObjectName(Subscription sub, String connectionClientId, ObjectName brokerJmxObjectName) {
+    public static String getSubscriptionObjectName(ConsumerInfo info, String connectionClientId, ObjectName brokerJmxObjectName) {
         Hashtable map = brokerJmxObjectName.getKeyPropertyList();
         String brokerDomain = brokerJmxObjectName.getDomain();
         String objectNameStr = brokerDomain + ":" + "BrokerName=" + map.get("BrokerName") + ",Type=Subscription,";
-        String destinationType = "destinationType=" + sub.getConsumerInfo().getDestination().getDestinationTypeAsString();
-        String destinationName = "destinationName=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getDestination().getPhysicalName());
+        String destinationType = "destinationType=" + info.getDestination().getDestinationTypeAsString();
+        String destinationName = "destinationName=" + JMXSupport.encodeObjectNamePart(info.getDestination().getPhysicalName());
         String clientId = "clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId);
         String persistentMode = "persistentMode=";
         String consumerId = "";
-        if (sub.getConsumerInfo().isDurable()) {
-            persistentMode += "Durable,subscriptionID=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getSubscriptionName());
+        if (info.isDurable()) {
+            persistentMode += "Durable,subscriptionID=" + JMXSupport.encodeObjectNamePart(info.getSubscriptionName());
         } else {
             persistentMode += "Non-Durable";
-            if (sub.getConsumerInfo() != null && sub.getConsumerInfo().getConsumerId() != null) {
-                consumerId = ",consumerId=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getConsumerId().toString());
+            if (info.getConsumerId() != null) {
+                consumerId = ",consumerId=" + JMXSupport.encodeObjectNamePart(info.getConsumerId().toString());
             }
         }
         objectNameStr += persistentMode + ",";
@@ -482,7 +482,7 @@ public class ManagedRegionBroker extends
                 info.setClientId(subscriptionKey.getClientId());
                 info.setSubscriptionName(subscriptionKey.getSubscriptionName());
                 info.setDestination(new ActiveMQTopic(view.getDestinationName()));
-                addInactiveSubscription(subscriptionKey, info);
+                addInactiveSubscription(subscriptionKey, info, (brokerService.isKeepDurableSubsActive() ? view.subscription : null));
             }
         }
     }
@@ -512,7 +512,7 @@ public class ManagedRegionBroker extends
             Map.Entry entry = (Entry)i.next();
             SubscriptionKey key = (SubscriptionKey)entry.getKey();
             SubscriptionInfo info = (SubscriptionInfo)entry.getValue();
-            addInactiveSubscription(key, info);
+            addInactiveSubscription(key, info, null);
         }
     }
 
@@ -525,12 +525,11 @@ public class ManagedRegionBroker extends
         return known;
     }
 
-    protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info) {
-        Hashtable<String, String> map = brokerObjectName.getKeyPropertyList();
+    protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info, Subscription subscription) {
         try {
-            ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," + "Type=Subscription," + "active=false,"
-                                                   + "name=" + JMXSupport.encodeObjectNamePart(key.toString()) + "");
-            SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info);
+            ConsumerInfo offlineConsumerInfo = subscription != null ? subscription.getConsumerInfo() : ((TopicRegion)getTopicRegion()).createInactiveConsumerInfo(info);
+            ObjectName objectName = new ObjectName(getSubscriptionObjectName(offlineConsumerInfo, info.getClientId(), brokerObjectName));
+            SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info, subscription);
 
             try {
                 AnnotatedMBean.registerMBean(managementContext, view, objectName);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1155437&r1=1155436&r2=1155437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Tue Aug  9 17:09:49 2011
@@ -47,7 +47,7 @@ public class DurableTopicSubscription ex
 
     private static final Logger LOG = LoggerFactory.getLogger(DurableTopicSubscription.class);
     private final ConcurrentHashMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>();
-    private final ConcurrentHashMap<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
+    private final ConcurrentHashMap<ActiveMQDestination, Destination> durableDestinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
     private final SubscriptionKey subscriptionKey;
     private final boolean keepDurableSubsActive;
     private AtomicBoolean active = new AtomicBoolean();
@@ -96,12 +96,14 @@ public class DurableTopicSubscription ex
     }
 
     public void add(ConnectionContext context, Destination destination) throws Exception {
-        super.add(context, destination);
+        if (!destinations.contains(destination)) {
+            super.add(context, destination);
+        }
         // do it just once per destination
-        if (destinations.containsKey(destination.getActiveMQDestination())) {
-            return;
+        if (durableDestinations.containsKey(destination.getActiveMQDestination())) {
+             return;
         }
-        destinations.put(destination.getActiveMQDestination(), destination);
+        durableDestinations.put(destination.getActiveMQDestination(), destination);
 
         if (active.get() || keepDurableSubsActive) {
             Topic topic = (Topic)destination;
@@ -130,7 +132,7 @@ public class DurableTopicSubscription ex
             this.info = info;
             LOG.debug("Activating " + this);
             if (!keepDurableSubsActive) {
-                for (Iterator<Destination> iter = destinations.values()
+                for (Iterator<Destination> iter = durableDestinations.values()
                         .iterator(); iter.hasNext();) {
                     Topic topic = (Topic) iter.next();
                     add(context, topic);
@@ -146,7 +148,7 @@ public class DurableTopicSubscription ex
                 // If nothing was in the persistent store, then try to use the
                 // recovery policy.
                 if (pending.isEmpty()) {
-                    for (Iterator<Destination> iter = destinations.values()
+                    for (Iterator<Destination> iter = durableDestinations.values()
                             .iterator(); iter.hasNext();) {
                         Topic topic = (Topic) iter.next();
                         topic.recoverRetroactiveMessages(context, this);
@@ -168,10 +170,12 @@ public class DurableTopicSubscription ex
         synchronized (pending) {
             pending.stop();
         }
-        if (!keepDurableSubsActive) {
-            for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
-                Topic topic = (Topic)iter.next();
+        for (Iterator<Destination> iter = durableDestinations.values().iterator(); iter.hasNext();) {
+            Topic topic = (Topic)iter.next();
+            if (!keepDurableSubsActive) {
                 topic.deactivate(context, this);
+            } else {
+                topic.getDestinationStatistics().getInflight().subtract(dispatched.size());
             }
         }
         
@@ -270,7 +274,7 @@ public class DurableTopicSubscription ex
 
     
     public synchronized String toString() {
-        return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" + destinations.size() + ", total=" + enqueueCounter + ", pending="
+        return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" + durableDestinations.size() + ", total=" + enqueueCounter + ", pending="
                + getPendingQueueSize() + ", dispatched=" + dispatchCounter + ", inflight=" + dispatched.size() + ", prefetchExtension=" + this.prefetchExtension;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=1155437&r1=1155436&r2=1155437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Tue Aug  9 17:09:49 2011
@@ -259,7 +259,7 @@ public class TopicRegion extends Abstrac
         return rc;
     }
 
-    private ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {
+    public ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {
         ConsumerInfo rc = new ConsumerInfo();
         rc.setSelector(info.getSelector());
         rc.setSubscriptionName(info.getSubscriptionName());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=1155437&r1=1155436&r2=1155437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Tue Aug  9 17:09:49 2011
@@ -261,16 +261,13 @@ public abstract class AbstractStoreCurso
             this.batchResetNeeded = false;
         }
         if (this.batchList.isEmpty() && this.storeHasMessages && this.size >0) {
-            this.storeHasMessages = false;
             try {
                 doFillBatch();
             } catch (Exception e) {
                 LOG.error(this + " - Failed to fill batch", e);
                 throw new RuntimeException(e);
             }
-            if (!this.batchList.isEmpty() || !hadSpace) {
-                this.storeHasMessages=true;
-            }
+            this.storeHasMessages = !this.batchList.isEmpty() || !hadSpace;
         }
     }
     

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1155437&r1=1155436&r2=1155437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java Tue Aug  9 17:09:49 2011
@@ -25,10 +25,13 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
+import javax.management.ObjectName;
 import junit.framework.Test;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
+import org.apache.activemq.broker.jmx.TopicViewMBean;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -44,12 +47,15 @@ public class DurableSubscriptionOfflineT
     private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOfflineTest.class);
     public boolean usePrioritySupport = Boolean.TRUE;
     public int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
+    public boolean keepDurableSubsActive = true;
     private BrokerService broker;
     private ActiveMQTopic topic;
     private Vector<Exception> exceptions = new Vector<Exception>();
 
     protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
-        return new ActiveMQConnectionFactory("vm://" + getName(true));
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true));
+        connectionFactory.setWatchTopicAdvisories(false);
+        return connectionFactory;
     }
 
     @Override
@@ -89,6 +95,8 @@ public class DurableSubscriptionOfflineT
         broker.setBrokerName(getName(true));
         broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
         broker.getManagementContext().setCreateConnector(false);
+        broker.setAdvisorySupport(false);
+        broker.setKeepDurableSubsActive(keepDurableSubsActive);
 
         if (usePrioritySupport) {
             PolicyEntry policy = new PolicyEntry();
@@ -322,6 +330,119 @@ public class DurableSubscriptionOfflineT
         assertEquals("offline consumer got all", sent, listener.count);
     }
 
+    public void initCombosForTestJMXCountersWithOfflineSubs() throws Exception {
+        this.addCombinationValues("keepDurableSubsActive",
+                new Object[]{Boolean.TRUE, Boolean.FALSE});
+    }
+
+    public void testJMXCountersWithOfflineSubs() throws Exception {
+        // create durable subscription 1
+        Connection con = createConnection("cliId1");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", null, true);
+        session.close();
+        con.close();
+
+        // restart broker
+        broker.stop();
+        createBroker(false /*deleteAllMessages*/);
+
+        // send messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        int sent = 0;
+        for (int i = 0; i < 10; i++) {
+            sent++;
+            Message message = session.createMessage();
+            producer.send(topic, message);
+        }
+        session.close();
+        con.close();
+
+        // consume some messages
+        con = createConnection("cliId1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+
+        for (int i=0; i<sent/2; i++) {
+            Message m =  consumer.receive(4000);
+            assertNotNull("got message: " + i, m);
+            LOG.info("Got :" + i + ", " + m);
+        }
+
+        // check some counters while active
+        ObjectName activeDurableSubName = broker.getAdminView().getDurableTopicSubscribers()[0];
+        LOG.info("active durable sub name: " + activeDurableSubName);
+        final DurableSubscriptionViewMBean durableSubscriptionView = (DurableSubscriptionViewMBean)
+                broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true);
+
+        assertTrue("is active", durableSubscriptionView.isActive());
+        assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView.getEnqueueCounter());
+        assertTrue("correct waiting acks", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 5 == durableSubscriptionView.getMessageCountAwaitingAcknowledge();
+            }
+        }));
+        assertEquals("correct dequeue", 5, durableSubscriptionView.getDequeueCounter());
+
+
+        ObjectName destinationName = broker.getAdminView().getTopics()[0];
+        TopicViewMBean topicView = (TopicViewMBean) broker.getManagementContext().newProxyInstance(destinationName, TopicViewMBean.class, true);
+        assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
+        assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
+        assertEquals("inflight", 5, topicView.getInFlightCount());
+
+        session.close();
+        con.close();
+
+        // check some counters when inactive
+        ObjectName inActiveDurableSubName = broker.getAdminView().getInactiveDurableTopicSubscribers()[0];
+        LOG.info("inactive durable sub name: " + inActiveDurableSubName);
+        DurableSubscriptionViewMBean durableSubscriptionView1 = (DurableSubscriptionViewMBean)
+                broker.getManagementContext().newProxyInstance(inActiveDurableSubName, DurableSubscriptionViewMBean.class, true);
+
+        assertTrue("is not active", !durableSubscriptionView1.isActive());
+        assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView1.getEnqueueCounter());
+        assertEquals("correct awaiting ack", 0, durableSubscriptionView1.getMessageCountAwaitingAcknowledge());
+        assertEquals("correct dequeue", keepDurableSubsActive ? 5 : 0, durableSubscriptionView1.getDequeueCounter());
+
+        // destination view
+        assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
+        assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
+        assertEquals("inflight back to 0 after deactivate", 0, topicView.getInFlightCount());
+
+        // consume the rest
+        con = createConnection("cliId1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+
+        for (int i=0; i<sent/2;i++) {
+            Message m =  consumer.receive(30000);
+            assertNotNull("got message: " + i, m);
+            LOG.info("Got :" + i + ", " + m);
+        }
+
+        activeDurableSubName = broker.getAdminView().getDurableTopicSubscribers()[0];
+        LOG.info("durable sub name: " + activeDurableSubName);
+        final DurableSubscriptionViewMBean durableSubscriptionView2 = (DurableSubscriptionViewMBean)
+                broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true);
+
+        assertTrue("is active", durableSubscriptionView2.isActive());
+        assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView2.getEnqueueCounter());
+        assertTrue("correct dequeue", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                long val = durableSubscriptionView2.getDequeueCounter();
+                LOG.info("dequeue count:" + val);
+                return 10 == val;
+            }
+        }));
+
+    }
+
     public void initCombosForTestOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
         this.addCombinationValues("defaultPersistenceAdapter",
                 new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
@@ -1062,6 +1183,8 @@ public class DurableSubscriptionOfflineT
     public void initCombosForTestCleanupDeletedSubAfterRestart() throws Exception {
         this.addCombinationValues("journalMaxFileLength",
                 new Object[]{new Integer(64 * 1024)});
+        this.addCombinationValues("keepDurableSubsActive",
+                new Object[]{Boolean.TRUE, Boolean.FALSE});
     }
 
     // https://issues.apache.org/jira/browse/AMQ-3206