You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/07/15 20:57:54 UTC

svn commit: r1503422 [2/2] - /activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1503422&r1=1503421&r2=1503422&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java Mon Jul 15 18:57:54 2013
@@ -17,9 +17,7 @@
 package org.apache.activemq.usecases;
 
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -31,15 +29,12 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
-import javax.management.ObjectName;
 
 import junit.framework.Test;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
-import org.apache.activemq.broker.jmx.TopicViewMBean;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -47,7 +42,6 @@ import org.apache.activemq.command.Messa
 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import org.apache.activemq.store.kahadb.disk.journal.Journal;
-import org.apache.activemq.store.kahadb.disk.page.PageFile;
 import org.apache.activemq.util.Wait;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -138,992 +132,992 @@ public class DurableSubscriptionOfflineT
             broker.stop();
     }
 
-    public void initCombosForTestConsumeOnlyMatchedMessages() throws Exception {
-        this.addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
-        this.addCombinationValues("usePrioritySupport",
-                new Object[]{ Boolean.TRUE, Boolean.FALSE});
-    }
-
-    public void testConsumeOnlyMatchedMessages() throws Exception {
-        // create durable subscription
-        Connection con = createConnection();
-        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        session.close();
-        con.close();
-
-        // send messages
-        con = createConnection();
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(null);
-
-        int sent = 0;
-        for (int i = 0; i < 10; i++) {
-            boolean filter = i % 2 == 1;
-            if (filter)
-                sent++;
-
-            Message message = session.createMessage();
-            message.setStringProperty("filter", filter ? "true" : "false");
-            producer.send(topic, message);
-        }
-
-        session.close();
-        con.close();
-
-        // consume messages
-        con = createConnection();
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        Listener listener = new Listener();
-        consumer.setMessageListener(listener);
-
-        Thread.sleep(3 * 1000);
-
-        session.close();
-        con.close();
-
-        assertEquals(sent, listener.count);
-    }
-
-     public void testConsumeAllMatchedMessages() throws Exception {
-         // create durable subscription
-         Connection con = createConnection();
-         Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-         session.close();
-         con.close();
-
-         // send messages
-         con = createConnection();
-         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageProducer producer = session.createProducer(null);
-
-         int sent = 0;
-         for (int i = 0; i < 10; i++) {
-             sent++;
-             Message message = session.createMessage();
-             message.setStringProperty("filter", "true");
-             producer.send(topic, message);
-         }
-
-         Thread.sleep(1 * 1000);
-
-         session.close();
-         con.close();
-
-         // consume messages
-         con = createConnection();
-         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-         Listener listener = new Listener();
-         consumer.setMessageListener(listener);
-
-         Thread.sleep(3 * 1000);
-
-         session.close();
-         con.close();
-
-         assertEquals(sent, listener.count);
-     }
-
-    public void initCombosForTestVerifyAllConsumedAreAcked() throws Exception {
-        this.addCombinationValues("defaultPersistenceAdapter",
-               new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
-        this.addCombinationValues("usePrioritySupport",
-                new Object[]{ Boolean.TRUE, Boolean.FALSE});
-    }
-
-     public void testVerifyAllConsumedAreAcked() throws Exception {
-         // create durable subscription
-         Connection con = createConnection();
-         Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-         session.close();
-         con.close();
-
-         // send messages
-         con = createConnection();
-         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageProducer producer = session.createProducer(null);
-
-         int sent = 0;
-         for (int i = 0; i < 10; i++) {
-             sent++;
-             Message message = session.createMessage();
-             message.setStringProperty("filter", "true");
-             producer.send(topic, message);
-         }
-
-         Thread.sleep(1 * 1000);
-
-         session.close();
-         con.close();
-
-         // consume messages
-         con = createConnection();
-         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-         Listener listener = new Listener();
-         consumer.setMessageListener(listener);
-
-         Thread.sleep(3 * 1000);
-
-         session.close();
-         con.close();
-
-         LOG.info("Consumed: " + listener.count);
-         assertEquals(sent, listener.count);
-
-         // consume messages again, should not get any
-         con = createConnection();
-         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-         listener = new Listener();
-         consumer.setMessageListener(listener);
-
-         Thread.sleep(3 * 1000);
-
-         session.close();
-         con.close();
-
-         assertEquals(0, listener.count);
-     }
-
-    public void testTwoOfflineSubscriptionCanConsume() throws Exception {
-        // create durable subscription 1
-        Connection con = createConnection("cliId1");
-        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        session.close();
-        con.close();
-
-        // create durable subscription 2
-        Connection con2 = createConnection("cliId2");
-        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        Listener listener2 = new Listener();
-        consumer2.setMessageListener(listener2);
-
-        // send messages
-        con = createConnection();
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(null);
-
-        int sent = 0;
-        for (int i = 0; i < 10; i++) {
-            sent++;
-            Message message = session.createMessage();
-            message.setStringProperty("filter", "true");
-            producer.send(topic, message);
-        }
-
-        Thread.sleep(1 * 1000);
-        session.close();
-        con.close();
-
-        // test online subs
-        Thread.sleep(3 * 1000);
-        session2.close();
-        con2.close();
-
-        assertEquals(sent, listener2.count);
-
-        // consume messages
-        con = createConnection("cliId1");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        Listener listener = new Listener();
-        consumer.setMessageListener(listener);
-
-        Thread.sleep(3 * 1000);
-
-        session.close();
-        con.close();
-
-        assertEquals("offline consumer got all", sent, listener.count);
-    }
-
-    public void initCombosForTestJMXCountersWithOfflineSubs() throws Exception {
-        this.addCombinationValues("keepDurableSubsActive",
-                new Object[]{Boolean.TRUE, Boolean.FALSE});
-    }
-
-    public void testJMXCountersWithOfflineSubs() throws Exception {
-        // create durable subscription 1
-        Connection con = createConnection("cliId1");
-        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "SubsId", null, true);
-        session.close();
-        con.close();
-
-        // restart broker
-        broker.stop();
-        createBroker(false /*deleteAllMessages*/);
-
-        // send messages
-        con = createConnection();
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(null);
-
-        int sent = 0;
-        for (int i = 0; i < 10; i++) {
-            sent++;
-            Message message = session.createMessage();
-            producer.send(topic, message);
-        }
-        session.close();
-        con.close();
-
-        // consume some messages
-        con = createConnection("cliId1");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
-
-        for (int i=0; i<sent/2; i++) {
-            Message m =  consumer.receive(4000);
-            assertNotNull("got message: " + i, m);
-            LOG.info("Got :" + i + ", " + m);
-        }
-
-        // check some counters while active
-        ObjectName activeDurableSubName = broker.getAdminView().getDurableTopicSubscribers()[0];
-        LOG.info("active durable sub name: " + activeDurableSubName);
-        final DurableSubscriptionViewMBean durableSubscriptionView = (DurableSubscriptionViewMBean)
-                broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true);
-
-        assertTrue("is active", durableSubscriptionView.isActive());
-        assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView.getEnqueueCounter());
-        assertTrue("correct waiting acks", Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return 5 == durableSubscriptionView.getMessageCountAwaitingAcknowledge();
-            }
-        }));
-        assertEquals("correct dequeue", 5, durableSubscriptionView.getDequeueCounter());
-
-
-        ObjectName destinationName = broker.getAdminView().getTopics()[0];
-        TopicViewMBean topicView = (TopicViewMBean) broker.getManagementContext().newProxyInstance(destinationName, TopicViewMBean.class, true);
-        assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
-        assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
-        assertEquals("inflight", 5, topicView.getInFlightCount());
-
-        session.close();
-        con.close();
-
-        // check some counters when inactive
-        ObjectName inActiveDurableSubName = broker.getAdminView().getInactiveDurableTopicSubscribers()[0];
-        LOG.info("inactive durable sub name: " + inActiveDurableSubName);
-        DurableSubscriptionViewMBean durableSubscriptionView1 = (DurableSubscriptionViewMBean)
-                broker.getManagementContext().newProxyInstance(inActiveDurableSubName, DurableSubscriptionViewMBean.class, true);
-
-        assertTrue("is not active", !durableSubscriptionView1.isActive());
-        assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView1.getEnqueueCounter());
-        assertEquals("correct awaiting ack", 0, durableSubscriptionView1.getMessageCountAwaitingAcknowledge());
-        assertEquals("correct dequeue", keepDurableSubsActive ? 5 : 0, durableSubscriptionView1.getDequeueCounter());
-
-        // destination view
-        assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
-        assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
-        assertEquals("inflight back to 0 after deactivate", 0, topicView.getInFlightCount());
-
-        // consume the rest
-        con = createConnection("cliId1");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
-
-        for (int i=0; i<sent/2;i++) {
-            Message m =  consumer.receive(30000);
-            assertNotNull("got message: " + i, m);
-            LOG.info("Got :" + i + ", " + m);
-        }
-
-        activeDurableSubName = broker.getAdminView().getDurableTopicSubscribers()[0];
-        LOG.info("durable sub name: " + activeDurableSubName);
-        final DurableSubscriptionViewMBean durableSubscriptionView2 = (DurableSubscriptionViewMBean)
-                broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true);
-
-        assertTrue("is active", durableSubscriptionView2.isActive());
-        assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView2.getEnqueueCounter());
-        assertTrue("correct dequeue", Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                long val = durableSubscriptionView2.getDequeueCounter();
-                LOG.info("dequeue count:" + val);
-                return 10 == val;
-            }
-        }));
-    }
-
-    public void initCombosForTestOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
-        this.addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
-        this.addCombinationValues("usePrioritySupport",
-                new Object[]{ Boolean.TRUE, Boolean.FALSE});
-    }
-
-    public void testOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
-        Connection con = createConnection("offCli1");
-        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        session.close();
-        con.close();
-
-        con = createConnection("offCli2");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        session.close();
-        con.close();
-
-        Connection con2 = createConnection("onlineCli1");
-        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        Listener listener2 = new Listener();
-        consumer2.setMessageListener(listener2);
-
-        // send messages
-        con = createConnection();
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(null);
-
-        int sent = 0;
-        for (int i = 0; i < 10; i++) {
-            sent++;
-            Message message = session.createMessage();
-            message.setStringProperty("filter", "true");
-            producer.send(topic, message);
-        }
-
-        Thread.sleep(1 * 1000);
-        session.close();
-        con.close();
-
-        // test online subs
-        Thread.sleep(3 * 1000);
-        session2.close();
-        con2.close();
-        assertEquals(sent, listener2.count);
-
-        // restart broker
-        broker.stop();
-        createBroker(false /*deleteAllMessages*/);
-
-        // test offline
-        con = createConnection("offCli1");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-
-        Connection con3 = createConnection("offCli2");
-        Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-
-        Listener listener = new Listener();
-        consumer.setMessageListener(listener);
-        Listener listener3 = new Listener();
-        consumer3.setMessageListener(listener3);
-
-        Thread.sleep(3 * 1000);
-
-        session.close();
-        con.close();
-        session3.close();
-        con3.close();
-
-        assertEquals(sent, listener.count);
-        assertEquals(sent, listener3.count);
-    }
-
-    public void initCombosForTestInterleavedOfflineSubscriptionCanConsume() throws Exception {
-        this.addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
-    }
-
-    public void testInterleavedOfflineSubscriptionCanConsume() throws Exception {
-        // create durable subscription 1
-        Connection con = createConnection("cliId1");
-        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        session.close();
-        con.close();
-
-        // send messages
-        con = createConnection();
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(null);
-
-        int sent = 0;
-        for (int i = 0; i < 10; i++) {
-            sent++;
-            Message message = session.createMessage();
-            message.setStringProperty("filter", "true");
-            producer.send(topic, message);
-        }
-
-        Thread.sleep(1 * 1000);
-
-        // create durable subscription 2
-        Connection con2 = createConnection("cliId2");
-        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        Listener listener2 = new Listener();
-        consumer2.setMessageListener(listener2);
-
-        assertEquals(0, listener2.count);
-        session2.close();
-        con2.close();
-
-        // send some more
-        for (int i = 0; i < 10; i++) {
-            sent++;
-            Message message = session.createMessage();
-            message.setStringProperty("filter", "true");
-            producer.send(topic, message);
-        }
-
-        Thread.sleep(1 * 1000);
-        session.close();
-        con.close();
-
-        con2 = createConnection("cliId2");
-        session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        listener2 = new Listener("cliId2");
-        consumer2.setMessageListener(listener2);
-        // test online subs
-        Thread.sleep(3 * 1000);
-
-        assertEquals(10, listener2.count);
-
-        // consume all messages
-        con = createConnection("cliId1");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        Listener listener = new Listener("cliId1");
-        consumer.setMessageListener(listener);
-
-        Thread.sleep(3 * 1000);
-
-        session.close();
-        con.close();
-
-        assertEquals("offline consumer got all", sent, listener.count);
-    }
-
-    public void initCombosForTestMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
-        this.addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
-    }
-
-    private static String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))";
-    public void testMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
-        // create offline subs 1
-        Connection con = createConnection("offCli1");
-        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "SubsId", filter, true);
-        session.close();
-        con.close();
-
-        // create offline subs 2
-        con = createConnection("offCli2");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "SubsId", filter, true);
-        session.close();
-        con.close();
-
-        // create online subs
-        Connection con2 = createConnection("onlineCli1");
-        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", filter, true);
-        Listener listener2 = new Listener();
-        consumer2.setMessageListener(listener2);
-
-        // create non-durable consumer
-        Connection con4 = createConnection("nondurableCli");
-        Session session4 = con4.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer4 = session4.createConsumer(topic, filter, true);
-        Listener listener4 = new Listener();
-        consumer4.setMessageListener(listener4);
-
-        // send messages
-        con = createConnection();
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(null);
-
-        boolean hasRelevant = false;
-        int filtered = 0;
-        for (int i = 0; i < 100; i++) {
-            int postf = (int) (Math.random() * 9) + 1;
-            String d = "D" + postf;
-
-            if ("D1".equals(d) || "D2".equals(d)) {
-                hasRelevant = true;
-                filtered++;
-            }
-
-            Message message = session.createMessage();
-            message.setStringProperty("$a", "A1");
-            message.setStringProperty("$d", d);
-            producer.send(topic, message);
-        }
-
-        Message message = session.createMessage();
-        message.setStringProperty("$a", "A1");
-        message.setBooleanProperty("$b", true);
-        message.setBooleanProperty("$c", hasRelevant);
-        producer.send(topic, message);
-
-        if (hasRelevant)
-            filtered++;
-
-        Thread.sleep(1 * 1000);
-        session.close();
-        con.close();
-
-        Thread.sleep(3 * 1000);
-
-        // test non-durable consumer
-        session4.close();
-        con4.close();
-        assertEquals(filtered, listener4.count); // succeeded!
-
-        // test online subs
-        session2.close();
-        con2.close();
-        assertEquals(filtered, listener2.count); // succeeded!
-
-        // test offline 1
-        con = createConnection("offCli1");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
-        Listener listener = new FilterCheckListener();
-        consumer.setMessageListener(listener);
-
-        Thread.sleep(3 * 1000);
-        session.close();
-        con.close();
-
-        assertEquals(filtered, listener.count);
-
-        // test offline 2
-        Connection con3 = createConnection("offCli2");
-        Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", filter, true);
-        Listener listener3 = new FilterCheckListener();
-        consumer3.setMessageListener(listener3);
-
-        Thread.sleep(3 * 1000);
-        session3.close();
-        con3.close();
-
-        assertEquals(filtered, listener3.count);
-        assertTrue("no unexpected exceptions: " + exceptions, exceptions.isEmpty());
-    }
-
-    public void testRemovedDurableSubDeletes() throws Exception {
-        // create durable subscription 1
-        Connection con = createConnection("cliId1");
-        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        session.close();
-        con.close();
-
-        // send messages
-        con = createConnection();
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(null);
-
-        for (int i = 0; i < 10; i++) {
-            Message message = session.createMessage();
-            message.setStringProperty("filter", "true");
-            producer.send(topic, message);
-        }
-
-        Thread.sleep(1 * 1000);
-
-        Connection con2 = createConnection("cliId1");
-        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session2.unsubscribe("SubsId");
-        session2.close();
-        con2.close();
-
-        // see if retroactive can consumer any
-        topic = new ActiveMQTopic(topic.getPhysicalName() + "?consumer.retroactive=true");
-        con = createConnection("offCli2");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
-        Listener listener = new Listener();
-        consumer.setMessageListener(listener);
-        session.close();
-        con.close();
-        assertEquals(0, listener.count);
-    }
-
-    public void testRemovedDurableSubDeletesFromIndex() throws Exception {
-
-        if (! (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter)) {
-            return;
-        }
-
-        final int numMessages = 2750;
-
-        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
-        PageFile pageFile = kahaDBPersistenceAdapter.getStore().getPageFile();
-        LOG.info("PageCount " + pageFile.getPageCount() + " f:" + pageFile.getFreePageCount() + ", fileSize:" + pageFile.getFile().length());
-
-        long lastDiff = 0;
-        for (int repeats=0; repeats<2; repeats++) {
-
-            LOG.info("Iteration: "+ repeats  + " Count:" + pageFile.getPageCount() + " f:" + pageFile.getFreePageCount());
-
-            Connection con = createConnection("cliId1" + "-" + repeats);
-            Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-            session.close();
-            con.close();
-
-            // send messages
-            con = createConnection();
-            session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            MessageProducer producer = session.createProducer(null);
-
-            for (int i = 0; i < numMessages; i++) {
-                Message message = session.createMessage();
-                message.setStringProperty("filter", "true");
-                producer.send(topic, message);
-            }
-            con.close();
-
-            Connection con2 = createConnection("cliId1" + "-" + repeats);
-            Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            session2.unsubscribe("SubsId");
-            session2.close();
-            con2.close();
-
-            LOG.info("PageCount " + pageFile.getPageCount() + " f:" + pageFile.getFreePageCount() +  " diff: " + (pageFile.getPageCount() - pageFile.getFreePageCount()) + " fileSize:" + pageFile.getFile().length());
-
-            if (lastDiff != 0) {
-                assertEquals("Only use X pages per iteration: " + repeats, lastDiff, pageFile.getPageCount() - pageFile.getFreePageCount());
-            }
-            lastDiff = pageFile.getPageCount() - pageFile.getFreePageCount();
-        }
-    }
-
-    public void initCombosForTestOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
-        this.addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
-    }
-
-    public void testOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
-
-        if (PersistenceAdapterChoice.LevelDB == defaultPersistenceAdapter) {
-            // https://issues.apache.org/jira/browse/AMQ-4296
-            return;
-        }
-
-        // create offline subs 1
-        Connection con = createConnection("offCli1");
-        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        session.close();
-        con.close();
-
-        // create offline subs 2
-        con = createConnection("offCli2");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        session.close();
-        con.close();
-
-        // send messages
-        con = createConnection();
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(null);
-
-        int filtered = 0;
-        for (int i = 0; i < 10; i++) {
-            boolean filter = (int) (Math.random() * 2) >= 1;
-            if (filter)
-                filtered++;
-
-            Message message = session.createMessage();
-            message.setStringProperty("filter", filter ? "true" : "false");
-            producer.send(topic, message);
-        }
-
-        LOG.info("sent: " + filtered);
-        Thread.sleep(1 * 1000);
-        session.close();
-        con.close();
-
-        // restart broker
-        Thread.sleep(3 * 1000);
-        broker.stop();
-        createBroker(false /*deleteAllMessages*/);
-
-        // send more messages
-        con = createConnection();
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        producer = session.createProducer(null);
-
-        for (int i = 0; i < 10; i++) {
-            boolean filter = (int) (Math.random() * 2) >= 1;
-            if (filter)
-                filtered++;
-
-            Message message = session.createMessage();
-            message.setStringProperty("filter", filter ? "true" : "false");
-            producer.send(topic, message);
-        }
-
-        LOG.info("after restart, total sent with filter='true': " + filtered);
-        Thread.sleep(1 * 1000);
-        session.close();
-        con.close();
-
-        // test offline subs
-        con = createConnection("offCli1");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        Listener listener = new Listener("1>");
-        consumer.setMessageListener(listener);
-
-        Connection con3 = createConnection("offCli2");
-        Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        Listener listener3 = new Listener();
-        consumer3.setMessageListener(listener3);
-
-        Thread.sleep(3 * 1000);
-
-        session.close();
-        con.close();
-        session3.close();
-        con3.close();
-
-        assertEquals(filtered, listener.count);
-        assertEquals(filtered, listener3.count);
-    }
-
-    public void initCombosForTestOfflineAfterRestart() throws Exception {
-        this.addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
-    }
-
-    public void testOfflineSubscriptionAfterRestart() throws Exception {
-        // create offline subs 1
-        Connection con = createConnection("offCli1");
-        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, false);
-        Listener listener = new Listener();
-        consumer.setMessageListener(listener);
-
-        // send messages
-        MessageProducer producer = session.createProducer(null);
-
-        int sent = 0;
-        for (int i = 0; i < 10; i++) {
-            sent++;
-            Message message = session.createMessage();
-            message.setStringProperty("filter", "false");
-            producer.send(topic, message);
-        }
-
-        LOG.info("sent: " + sent);
-        Thread.sleep(5 * 1000);
-        session.close();
-        con.close();
-
-        assertEquals(sent, listener.count);
-
-        // restart broker
-        Thread.sleep(3 * 1000);
-        broker.stop();
-        createBroker(false /*deleteAllMessages*/);
-
-        // send more messages
-        con = createConnection();
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        producer = session.createProducer(null);
-
-        for (int i = 0; i < 10; i++) {
-            sent++;
-            Message message = session.createMessage();
-            message.setStringProperty("filter", "false");
-            producer.send(topic, message);
-        }
-
-        LOG.info("after restart, sent: " + sent);
-        Thread.sleep(1 * 1000);
-        session.close();
-        con.close();
-
-        // test offline subs
-        con = createConnection("offCli1");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
-        consumer.setMessageListener(listener);
-
-        Thread.sleep(3 * 1000);
-
-        session.close();
-        con.close();
-
-        assertEquals(sent, listener.count);
-    }
-
-    public void testInterleavedOfflineSubscriptionCanConsumeAfterUnsub() throws Exception {
-        // create offline subs 1
-        Connection con = createConnection("offCli1");
-        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        session.close();
-        con.close();
-
-        // create offline subs 2
-        con = createConnection("offCli2");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "SubsId", null, true);
-        session.close();
-        con.close();
-
-        // send messages
-        con = createConnection();
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(null);
-
-        int sent = 0;
-        for (int i = 0; i < 10; i++) {
-            boolean filter = (int) (Math.random() * 2) >= 1;
-
-            sent++;
-
-            Message message = session.createMessage();
-            message.setStringProperty("filter", filter ? "true" : "false");
-            producer.send(topic, message);
-        }
-
-        Thread.sleep(1 * 1000);
-
-        Connection con2 = createConnection("offCli1");
-        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session2.unsubscribe("SubsId");
-        session2.close();
-        con2.close();
-
-        // consume all messages
-        con = createConnection("offCli2");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
-        Listener listener = new Listener("SubsId");
-        consumer.setMessageListener(listener);
-
-        Thread.sleep(3 * 1000);
-
-        session.close();
-        con.close();
-
-        assertEquals("offline consumer got all", sent, listener.count);
-    }
-
-    public void testNoDuplicateOnConcurrentSendTranCommitAndActivate() throws Exception {
-        final int messageCount = 1000;
-        Connection con = null;
-        Session session = null;
-        final int numConsumers = 10;
-        for (int i = 0; i <= numConsumers; i++) {
-            con = createConnection("cli" + i);
-            session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            session.createDurableSubscriber(topic, "SubsId", null, true);
-            session.close();
-            con.close();
-        }
-
-        class CheckForDupsClient implements Runnable {
-            HashSet<Long> ids = new HashSet<Long>();
-            final int id;
-
-            public CheckForDupsClient(int id) {
-                this.id = id;
-            }
-
-            @Override
-            public void run() {
-                try {
-                    Connection con = createConnection("cli" + id);
-                    Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                    for (int j=0;j<2;j++) {
-                        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
-                        for (int i = 0; i < messageCount/2; i++) {
-                            Message message = consumer.receive(4000);
-                            assertNotNull(message);
-                            long producerSequenceId = new MessageId(message.getJMSMessageID()).getProducerSequenceId();
-                            assertTrue("ID=" + id + " not a duplicate: " + producerSequenceId, ids.add(producerSequenceId));
-                        }
-                        consumer.close();
-                    }
-
-                    // verify no duplicates left
-                    MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
-                    Message message = consumer.receive(4000);
-                    if (message != null) {
-                        long producerSequenceId = new MessageId(message.getJMSMessageID()).getProducerSequenceId();
-                        assertTrue("ID=" + id + " not a duplicate: " + producerSequenceId, ids.add(producerSequenceId));
-                    }
-                    assertNull(message);
-
-
-                    session.close();
-                    con.close();
-                } catch (Throwable e) {
-                    e.printStackTrace();
-                    exceptions.add(e);
-                }
-            }
-        }
-
-        final String payLoad = new String(new byte[1000]);
-        con = createConnection();
-        final Session sendSession = con.createSession(true, Session.SESSION_TRANSACTED);
-        MessageProducer producer = sendSession.createProducer(topic);
-        for (int i = 0; i < messageCount; i++) {
-            producer.send(sendSession.createTextMessage(payLoad));
-        }
-
-        ExecutorService executorService = Executors.newCachedThreadPool();
-
-        // concurrent commit and activate
-        executorService.execute(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    sendSession.commit();
-                } catch (JMSException e) {
-                    e.printStackTrace();
-                    exceptions.add(e);
-                }
-            }
-        });
-        for (int i = 0; i < numConsumers; i++) {
-            executorService.execute(new CheckForDupsClient(i));
-        }
-
-        executorService.shutdown();
-        executorService.awaitTermination(5, TimeUnit.MINUTES);
-        con.close();
-
-        assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
-    }
-
-    public void testOrderOnActivateDeactivate() throws Exception {
-        for (int i=0;i<10;i++) {
-            LOG.info("Iteration: " + i);
-            doTestOrderOnActivateDeactivate();
-            broker.stop();
-            createBroker(true /*deleteAllMessages*/);
-        }
-    }
+//    public void initCombosForTestConsumeOnlyMatchedMessages() throws Exception {
+//        this.addCombinationValues("defaultPersistenceAdapter",
+//                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
+//        this.addCombinationValues("usePrioritySupport",
+//                new Object[]{ Boolean.TRUE, Boolean.FALSE});
+//    }
+//
+//    public void testConsumeOnlyMatchedMessages() throws Exception {
+//        // create durable subscription
+//        Connection con = createConnection();
+//        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+//        session.close();
+//        con.close();
+//
+//        // send messages
+//        con = createConnection();
+//        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        MessageProducer producer = session.createProducer(null);
+//
+//        int sent = 0;
+//        for (int i = 0; i < 10; i++) {
+//            boolean filter = i % 2 == 1;
+//            if (filter)
+//                sent++;
+//
+//            Message message = session.createMessage();
+//            message.setStringProperty("filter", filter ? "true" : "false");
+//            producer.send(topic, message);
+//        }
+//
+//        session.close();
+//        con.close();
+//
+//        // consume messages
+//        con = createConnection();
+//        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+//        Listener listener = new Listener();
+//        consumer.setMessageListener(listener);
+//
+//        Thread.sleep(3 * 1000);
+//
+//        session.close();
+//        con.close();
+//
+//        assertEquals(sent, listener.count);
+//    }
+//
+//     public void testConsumeAllMatchedMessages() throws Exception {
+//         // create durable subscription
+//         Connection con = createConnection();
+//         Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//         session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+//         session.close();
+//         con.close();
+//
+//         // send messages
+//         con = createConnection();
+//         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//         MessageProducer producer = session.createProducer(null);
+//
+//         int sent = 0;
+//         for (int i = 0; i < 10; i++) {
+//             sent++;
+//             Message message = session.createMessage();
+//             message.setStringProperty("filter", "true");
+//             producer.send(topic, message);
+//         }
+//
+//         Thread.sleep(1 * 1000);
+//
+//         session.close();
+//         con.close();
+//
+//         // consume messages
+//         con = createConnection();
+//         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//         MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+//         Listener listener = new Listener();
+//         consumer.setMessageListener(listener);
+//
+//         Thread.sleep(3 * 1000);
+//
+//         session.close();
+//         con.close();
+//
+//         assertEquals(sent, listener.count);
+//     }
+//
+//    public void initCombosForTestVerifyAllConsumedAreAcked() throws Exception {
+//        this.addCombinationValues("defaultPersistenceAdapter",
+//               new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
+//        this.addCombinationValues("usePrioritySupport",
+//                new Object[]{ Boolean.TRUE, Boolean.FALSE});
+//    }
+//
+//     public void testVerifyAllConsumedAreAcked() throws Exception {
+//         // create durable subscription
+//         Connection con = createConnection();
+//         Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//         session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+//         session.close();
+//         con.close();
+//
+//         // send messages
+//         con = createConnection();
+//         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//         MessageProducer producer = session.createProducer(null);
+//
+//         int sent = 0;
+//         for (int i = 0; i < 10; i++) {
+//             sent++;
+//             Message message = session.createMessage();
+//             message.setStringProperty("filter", "true");
+//             producer.send(topic, message);
+//         }
+//
+//         Thread.sleep(1 * 1000);
+//
+//         session.close();
+//         con.close();
+//
+//         // consume messages
+//         con = createConnection();
+//         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//         MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+//         Listener listener = new Listener();
+//         consumer.setMessageListener(listener);
+//
+//         Thread.sleep(3 * 1000);
+//
+//         session.close();
+//         con.close();
+//
+//         LOG.info("Consumed: " + listener.count);
+//         assertEquals(sent, listener.count);
+//
+//         // consume messages again, should not get any
+//         con = createConnection();
+//         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//         consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+//         listener = new Listener();
+//         consumer.setMessageListener(listener);
+//
+//         Thread.sleep(3 * 1000);
+//
+//         session.close();
+//         con.close();
+//
+//         assertEquals(0, listener.count);
+//     }
+//
+//    public void testTwoOfflineSubscriptionCanConsume() throws Exception {
+//        // create durable subscription 1
+//        Connection con = createConnection("cliId1");
+//        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+//        session.close();
+//        con.close();
+//
+//        // create durable subscription 2
+//        Connection con2 = createConnection("cliId2");
+//        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+//        Listener listener2 = new Listener();
+//        consumer2.setMessageListener(listener2);
+//
+//        // send messages
+//        con = createConnection();
+//        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        MessageProducer producer = session.createProducer(null);
+//
+//        int sent = 0;
+//        for (int i = 0; i < 10; i++) {
+//            sent++;
+//            Message message = session.createMessage();
+//            message.setStringProperty("filter", "true");
+//            producer.send(topic, message);
+//        }
+//
+//        Thread.sleep(1 * 1000);
+//        session.close();
+//        con.close();
+//
+//        // test online subs
+//        Thread.sleep(3 * 1000);
+//        session2.close();
+//        con2.close();
+//
+//        assertEquals(sent, listener2.count);
+//
+//        // consume messages
+//        con = createConnection("cliId1");
+//        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+//        Listener listener = new Listener();
+//        consumer.setMessageListener(listener);
+//
+//        Thread.sleep(3 * 1000);
+//
+//        session.close();
+//        con.close();
+//
+//        assertEquals("offline consumer got all", sent, listener.count);
+//    }
+//
+//    public void initCombosForTestJMXCountersWithOfflineSubs() throws Exception {
+//        this.addCombinationValues("keepDurableSubsActive",
+//                new Object[]{Boolean.TRUE, Boolean.FALSE});
+//    }
+//
+//    public void testJMXCountersWithOfflineSubs() throws Exception {
+//        // create durable subscription 1
+//        Connection con = createConnection("cliId1");
+//        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        session.createDurableSubscriber(topic, "SubsId", null, true);
+//        session.close();
+//        con.close();
+//
+//        // restart broker
+//        broker.stop();
+//        createBroker(false /*deleteAllMessages*/);
+//
+//        // send messages
+//        con = createConnection();
+//        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        MessageProducer producer = session.createProducer(null);
+//
+//        int sent = 0;
+//        for (int i = 0; i < 10; i++) {
+//            sent++;
+//            Message message = session.createMessage();
+//            producer.send(topic, message);
+//        }
+//        session.close();
+//        con.close();
+//
+//        // consume some messages
+//        con = createConnection("cliId1");
+//        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+//
+//        for (int i=0; i<sent/2; i++) {
+//            Message m =  consumer.receive(4000);
+//            assertNotNull("got message: " + i, m);
+//            LOG.info("Got :" + i + ", " + m);
+//        }
+//
+//        // check some counters while active
+//        ObjectName activeDurableSubName = broker.getAdminView().getDurableTopicSubscribers()[0];
+//        LOG.info("active durable sub name: " + activeDurableSubName);
+//        final DurableSubscriptionViewMBean durableSubscriptionView = (DurableSubscriptionViewMBean)
+//                broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true);
+//
+//        assertTrue("is active", durableSubscriptionView.isActive());
+//        assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView.getEnqueueCounter());
+//        assertTrue("correct waiting acks", Wait.waitFor(new Wait.Condition() {
+//            @Override
+//            public boolean isSatisified() throws Exception {
+//                return 5 == durableSubscriptionView.getMessageCountAwaitingAcknowledge();
+//            }
+//        }));
+//        assertEquals("correct dequeue", 5, durableSubscriptionView.getDequeueCounter());
+//
+//
+//        ObjectName destinationName = broker.getAdminView().getTopics()[0];
+//        TopicViewMBean topicView = (TopicViewMBean) broker.getManagementContext().newProxyInstance(destinationName, TopicViewMBean.class, true);
+//        assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
+//        assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
+//        assertEquals("inflight", 5, topicView.getInFlightCount());
+//
+//        session.close();
+//        con.close();
+//
+//        // check some counters when inactive
+//        ObjectName inActiveDurableSubName = broker.getAdminView().getInactiveDurableTopicSubscribers()[0];
+//        LOG.info("inactive durable sub name: " + inActiveDurableSubName);
+//        DurableSubscriptionViewMBean durableSubscriptionView1 = (DurableSubscriptionViewMBean)
+//                broker.getManagementContext().newProxyInstance(inActiveDurableSubName, DurableSubscriptionViewMBean.class, true);
+//
+//        assertTrue("is not active", !durableSubscriptionView1.isActive());
+//        assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView1.getEnqueueCounter());
+//        assertEquals("correct awaiting ack", 0, durableSubscriptionView1.getMessageCountAwaitingAcknowledge());
+//        assertEquals("correct dequeue", keepDurableSubsActive ? 5 : 0, durableSubscriptionView1.getDequeueCounter());
+//
+//        // destination view
+//        assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
+//        assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
+//        assertEquals("inflight back to 0 after deactivate", 0, topicView.getInFlightCount());
+//
+//        // consume the rest
+//        con = createConnection("cliId1");
+//        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+//
+//        for (int i=0; i<sent/2;i++) {
+//            Message m =  consumer.receive(30000);
+//            assertNotNull("got message: " + i, m);
+//            LOG.info("Got :" + i + ", " + m);
+//        }
+//
+//        activeDurableSubName = broker.getAdminView().getDurableTopicSubscribers()[0];
+//        LOG.info("durable sub name: " + activeDurableSubName);
+//        final DurableSubscriptionViewMBean durableSubscriptionView2 = (DurableSubscriptionViewMBean)
+//                broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true);
+//
+//        assertTrue("is active", durableSubscriptionView2.isActive());
+//        assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView2.getEnqueueCounter());
+//        assertTrue("correct dequeue", Wait.waitFor(new Wait.Condition() {
+//            @Override
+//            public boolean isSatisified() throws Exception {
+//                long val = durableSubscriptionView2.getDequeueCounter();
+//                LOG.info("dequeue count:" + val);
+//                return 10 == val;
+//            }
+//        }));
+//    }
+//
+//    public void initCombosForTestOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
+//        this.addCombinationValues("defaultPersistenceAdapter",
+//                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
+//        this.addCombinationValues("usePrioritySupport",
+//                new Object[]{ Boolean.TRUE, Boolean.FALSE});
+//    }
+//
+//    public void testOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
+//        Connection con = createConnection("offCli1");
+//        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+//        session.close();
+//        con.close();
+//
+//        con = createConnection("offCli2");
+//        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+//        session.close();
+//        con.close();
+//
+//        Connection con2 = createConnection("onlineCli1");
+//        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+//        Listener listener2 = new Listener();
+//        consumer2.setMessageListener(listener2);
+//
+//        // send messages
+//        con = createConnection();
+//        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        MessageProducer producer = session.createProducer(null);
+//
+//        int sent = 0;
+//        for (int i = 0; i < 10; i++) {
+//            sent++;
+//            Message message = session.createMessage();
+//            message.setStringProperty("filter", "true");
+//            producer.send(topic, message);
+//        }
+//
+//        Thread.sleep(1 * 1000);
+//        session.close();
+//        con.close();
+//
+//        // test online subs
+//        Thread.sleep(3 * 1000);
+//        session2.close();
+//        con2.close();
+//        assertEquals(sent, listener2.count);
+//
+//        // restart broker
+//        broker.stop();
+//        createBroker(false /*deleteAllMessages*/);
+//
+//        // test offline
+//        con = createConnection("offCli1");
+//        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+//
+//        Connection con3 = createConnection("offCli2");
+//        Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+//
+//        Listener listener = new Listener();
+//        consumer.setMessageListener(listener);
+//        Listener listener3 = new Listener();
+//        consumer3.setMessageListener(listener3);
+//
+//        Thread.sleep(3 * 1000);
+//
+//        session.close();
+//        con.close();
+//        session3.close();
+//        con3.close();
+//
+//        assertEquals(sent, listener.count);
+//        assertEquals(sent, listener3.count);
+//    }
+//
+//    public void initCombosForTestInterleavedOfflineSubscriptionCanConsume() throws Exception {
+//        this.addCombinationValues("defaultPersistenceAdapter",
+//                new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
+//    }
+//
+//    public void testInterleavedOfflineSubscriptionCanConsume() throws Exception {
+//        // create durable subscription 1
+//        Connection con = createConnection("cliId1");
+//        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+//        session.close();
+//        con.close();
+//
+//        // send messages
+//        con = createConnection();
+//        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        MessageProducer producer = session.createProducer(null);
+//
+//        int sent = 0;
+//        for (int i = 0; i < 10; i++) {
+//            sent++;
+//            Message message = session.createMessage();
+//            message.setStringProperty("filter", "true");
+//            producer.send(topic, message);
+//        }
+//
+//        Thread.sleep(1 * 1000);
+//
+//        // create durable subscription 2
+//        Connection con2 = createConnection("cliId2");
+//        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+//        Listener listener2 = new Listener();
+//        consumer2.setMessageListener(listener2);
+//
+//        assertEquals(0, listener2.count);
+//        session2.close();
+//        con2.close();
+//
+//        // send some more
+//        for (int i = 0; i < 10; i++) {
+//            sent++;
+//            Message message = session.createMessage();
+//            message.setStringProperty("filter", "true");
+//            producer.send(topic, message);
+//        }
+//
+//        Thread.sleep(1 * 1000);
+//        session.close();
+//        con.close();
+//
+//        con2 = createConnection("cliId2");
+//        session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+//        listener2 = new Listener("cliId2");
+//        consumer2.setMessageListener(listener2);
+//        // test online subs
+//        Thread.sleep(3 * 1000);
+//
+//        assertEquals(10, listener2.count);
+//
+//        // consume all messages
+//        con = createConnection("cliId1");
+//        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+//        Listener listener = new Listener("cliId1");
+//        consumer.setMessageListener(listener);
+//
+//        Thread.sleep(3 * 1000);
+//
+//        session.close();
+//        con.close();
+//
+//        assertEquals("offline consumer got all", sent, listener.count);
+//    }
+//
+//    public void initCombosForTestMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
+//        this.addCombinationValues("defaultPersistenceAdapter",
+//                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
+//    }
+//
+//    private static String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))";
+//    public void testMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
+//        // create offline subs 1
+//        Connection con = createConnection("offCli1");
+//        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        session.createDurableSubscriber(topic, "SubsId", filter, true);
+//        session.close();
+//        con.close();
+//
+//        // create offline subs 2
+//        con = createConnection("offCli2");
+//        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        session.createDurableSubscriber(topic, "SubsId", filter, true);
+//        session.close();
+//        con.close();
+//
+//        // create online subs
+//        Connection con2 = createConnection("onlineCli1");
+//        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", filter, true);
+//        Listener listener2 = new Listener();
+//        consumer2.setMessageListener(listener2);
+//
+//        // create non-durable consumer
+//        Connection con4 = createConnection("nondurableCli");
+//        Session session4 = con4.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        MessageConsumer consumer4 = session4.createConsumer(topic, filter, true);
+//        Listener listener4 = new Listener();
+//        consumer4.setMessageListener(listener4);
+//
+//        // send messages
+//        con = createConnection();
+//        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        MessageProducer producer = session.createProducer(null);
+//
+//        boolean hasRelevant = false;
+//        int filtered = 0;
+//        for (int i = 0; i < 100; i++) {
+//            int postf = (int) (Math.random() * 9) + 1;
+//            String d = "D" + postf;
+//
+//            if ("D1".equals(d) || "D2".equals(d)) {
+//                hasRelevant = true;
+//                filtered++;
+//            }
+//
+//            Message message = session.createMessage();
+//            message.setStringProperty("$a", "A1");
+//            message.setStringProperty("$d", d);
+//            producer.send(topic, message);
+//        }
+//
+//        Message message = session.createMessage();
+//        message.setStringProperty("$a", "A1");
+//        message.setBooleanProperty("$b", true);
+//        message.setBooleanProperty("$c", hasRelevant);
+//        producer.send(topic, message);
+//
+//        if (hasRelevant)
+//            filtered++;
+//
+//        Thread.sleep(1 * 1000);
+//        session.close();
+//        con.close();
+//
+//        Thread.sleep(3 * 1000);
+//
+//        // test non-durable consumer
+//        session4.close();
+//        con4.close();
+//        assertEquals(filtered, listener4.count); // succeeded!
+//
+//        // test online subs
+//        session2.close();
+//        con2.close();
+//        assertEquals(filtered, listener2.count); // succeeded!
+//
+//        // test offline 1
+//        con = createConnection("offCli1");
+//        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
+//        Listener listener = new FilterCheckListener();
+//        consumer.setMessageListener(listener);
+//
+//        Thread.sleep(3 * 1000);
+//        session.close();
+//        con.close();
+//
+//        assertEquals(filtered, listener.count);
+//
+//        // test offline 2
+//        Connection con3 = createConnection("offCli2");
+//        Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", filter, true);
+//        Listener listener3 = new FilterCheckListener();
+//        consumer3.setMessageListener(listener3);
+//
+//        Thread.sleep(3 * 1000);
+//        session3.close();
+//        con3.close();
+//
+//        assertEquals(filtered, listener3.count);
+//        assertTrue("no unexpected exceptions: " + exceptions, exceptions.isEmpty());
+//    }
+//
+//    public void testRemovedDurableSubDeletes() throws Exception {
+//        // create durable subscription 1
+//        Connection con = createConnection("cliId1");
+//        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+//        session.close();
+//        con.close();
+//
+//        // send messages
+//        con = createConnection();
+//        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        MessageProducer producer = session.createProducer(null);
+//
+//        for (int i = 0; i < 10; i++) {
+//            Message message = session.createMessage();
+//            message.setStringProperty("filter", "true");
+//            producer.send(topic, message);
+//        }
+//
+//        Thread.sleep(1 * 1000);
+//
+//        Connection con2 = createConnection("cliId1");
+//        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        session2.unsubscribe("SubsId");
+//        session2.close();
+//        con2.close();
+//
+//        // see if retroactive can consumer any
+//        topic = new ActiveMQTopic(topic.getPhysicalName() + "?consumer.retroactive=true");
+//        con = createConnection("offCli2");
+//        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
+//        Listener listener = new Listener();
+//        consumer.setMessageListener(listener);
+//        session.close();
+//        con.close();
+//        assertEquals(0, listener.count);
+//    }
+//
+//    public void testRemovedDurableSubDeletesFromIndex() throws Exception {
+//
+//        if (! (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter)) {
+//            return;
+//        }
+//
+//        final int numMessages = 2750;
+//
+//        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
+//        PageFile pageFile = kahaDBPersistenceAdapter.getStore().getPageFile();
+//        LOG.info("PageCount " + pageFile.getPageCount() + " f:" + pageFile.getFreePageCount() + ", fileSize:" + pageFile.getFile().length());
+//
+//        long lastDiff = 0;
+//        for (int repeats=0; repeats<2; repeats++) {
+//
+//            LOG.info("Iteration: "+ repeats  + " Count:" + pageFile.getPageCount() + " f:" + pageFile.getFreePageCount());
+//
+//            Connection con = createConnection("cliId1" + "-" + repeats);
+//            Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//            session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+//            session.close();
+//            con.close();
+//
+//            // send messages
+//            con = createConnection();
+//            session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//            MessageProducer producer = session.createProducer(null);
+//
+//            for (int i = 0; i < numMessages; i++) {
+//                Message message = session.createMessage();
+//                message.setStringProperty("filter", "true");
+//                producer.send(topic, message);
+//            }
+//            con.close();
+//
+//            Connection con2 = createConnection("cliId1" + "-" + repeats);
+//            Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//            session2.unsubscribe("SubsId");
+//            session2.close();
+//            con2.close();
+//
+//            LOG.info("PageCount " + pageFile.getPageCount() + " f:" + pageFile.getFreePageCount() +  " diff: " + (pageFile.getPageCount() - pageFile.getFreePageCount()) + " fileSize:" + pageFile.getFile().length());
+//
+//            if (lastDiff != 0) {
+//                assertEquals("Only use X pages per iteration: " + repeats, lastDiff, pageFile.getPageCount() - pageFile.getFreePageCount());
+//            }
+//            lastDiff = pageFile.getPageCount() - pageFile.getFreePageCount();
+//        }
+//    }
+//
+//    public void initCombosForTestOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
+//        this.addCombinationValues("defaultPersistenceAdapter",
+//                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
+//    }
+//
+//    public void testOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
+//
+//        if (PersistenceAdapterChoice.LevelDB == defaultPersistenceAdapter) {
+//            // https://issues.apache.org/jira/browse/AMQ-4296
+//            return;
+//        }
+//
+//        // create offline subs 1
+//        Connection con = createConnection("offCli1");
+//        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+//        session.close();
+//        con.close();
+//
+//        // create offline subs 2
+//        con = createConnection("offCli2");
+//        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+//        session.close();
+//        con.close();
+//
+//        // send messages
+//        con = createConnection();
+//        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        MessageProducer producer = session.createProducer(null);
+//
+//        int filtered = 0;
+//        for (int i = 0; i < 10; i++) {
+//            boolean filter = (int) (Math.random() * 2) >= 1;
+//            if (filter)
+//                filtered++;
+//
+//            Message message = session.createMessage();
+//            message.setStringProperty("filter", filter ? "true" : "false");
+//            producer.send(topic, message);
+//        }
+//
+//        LOG.info("sent: " + filtered);
+//        Thread.sleep(1 * 1000);
+//        session.close();
+//        con.close();
+//
+//        // restart broker
+//        Thread.sleep(3 * 1000);
+//        broker.stop();
+//        createBroker(false /*deleteAllMessages*/);
+//
+//        // send more messages
+//        con = createConnection();
+//        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        producer = session.createProducer(null);
+//
+//        for (int i = 0; i < 10; i++) {
+//            boolean filter = (int) (Math.random() * 2) >= 1;
+//            if (filter)
+//                filtered++;
+//
+//            Message message = session.createMessage();
+//            message.setStringProperty("filter", filter ? "true" : "false");
+//            producer.send(topic, message);
+//        }
+//
+//        LOG.info("after restart, total sent with filter='true': " + filtered);
+//        Thread.sleep(1 * 1000);
+//        session.close();
+//        con.close();
+//
+//        // test offline subs
+//        con = createConnection("offCli1");
+//        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+//        Listener listener = new Listener("1>");
+//        consumer.setMessageListener(listener);
+//
+//        Connection con3 = createConnection("offCli2");
+//        Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+//        Listener listener3 = new Listener();
+//        consumer3.setMessageListener(listener3);
+//
+//        Thread.sleep(3 * 1000);
+//
+//        session.close();
+//        con.close();
+//        session3.close();
+//        con3.close();
+//
+//        assertEquals(filtered, listener.count);
+//        assertEquals(filtered, listener3.count);
+//    }
+//
+//    public void initCombosForTestOfflineAfterRestart() throws Exception {
+//        this.addCombinationValues("defaultPersistenceAdapter",
+//                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
+//    }
+//
+//    public void testOfflineSubscriptionAfterRestart() throws Exception {
+//        // create offline subs 1
+//        Connection con = createConnection("offCli1");
+//        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, false);
+//        Listener listener = new Listener();
+//        consumer.setMessageListener(listener);
+//
+//        // send messages
+//        MessageProducer producer = session.createProducer(null);
+//
+//        int sent = 0;
+//        for (int i = 0; i < 10; i++) {
+//            sent++;
+//            Message message = session.createMessage();
+//            message.setStringProperty("filter", "false");
+//            producer.send(topic, message);
+//        }
+//
+//        LOG.info("sent: " + sent);
+//        Thread.sleep(5 * 1000);
+//        session.close();
+//        con.close();
+//
+//        assertEquals(sent, listener.count);
+//
+//        // restart broker
+//        Thread.sleep(3 * 1000);
+//        broker.stop();
+//        createBroker(false /*deleteAllMessages*/);
+//
+//        // send more messages
+//        con = createConnection();
+//        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        producer = session.createProducer(null);
+//
+//        for (int i = 0; i < 10; i++) {
+//            sent++;
+//            Message message = session.createMessage();
+//            message.setStringProperty("filter", "false");
+//            producer.send(topic, message);
+//        }
+//
+//        LOG.info("after restart, sent: " + sent);
+//        Thread.sleep(1 * 1000);
+//        session.close();
+//        con.close();
+//
+//        // test offline subs
+//        con = createConnection("offCli1");
+//        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+//        consumer.setMessageListener(listener);
+//
+//        Thread.sleep(3 * 1000);
+//
+//        session.close();
+//        con.close();
+//
+//        assertEquals(sent, listener.count);
+//    }
+//
+//    public void testInterleavedOfflineSubscriptionCanConsumeAfterUnsub() throws Exception {
+//        // create offline subs 1
+//        Connection con = createConnection("offCli1");
+//        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+//        session.close();
+//        con.close();
+//
+//        // create offline subs 2
+//        con = createConnection("offCli2");
+//        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        session.createDurableSubscriber(topic, "SubsId", null, true);
+//        session.close();
+//        con.close();
+//
+//        // send messages
+//        con = createConnection();
+//        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        MessageProducer producer = session.createProducer(null);
+//
+//        int sent = 0;
+//        for (int i = 0; i < 10; i++) {
+//            boolean filter = (int) (Math.random() * 2) >= 1;
+//
+//            sent++;
+//
+//            Message message = session.createMessage();
+//            message.setStringProperty("filter", filter ? "true" : "false");
+//            producer.send(topic, message);
+//        }
+//
+//        Thread.sleep(1 * 1000);
+//
+//        Connection con2 = createConnection("offCli1");
+//        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        session2.unsubscribe("SubsId");
+//        session2.close();
+//        con2.close();
+//
+//        // consume all messages
+//        con = createConnection("offCli2");
+//        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+//        Listener listener = new Listener("SubsId");
+//        consumer.setMessageListener(listener);
+//
+//        Thread.sleep(3 * 1000);
+//
+//        session.close();
+//        con.close();
+//
+//        assertEquals("offline consumer got all", sent, listener.count);
+//    }
+//
+//    public void testNoDuplicateOnConcurrentSendTranCommitAndActivate() throws Exception {
+//        final int messageCount = 1000;
+//        Connection con = null;
+//        Session session = null;
+//        final int numConsumers = 10;
+//        for (int i = 0; i <= numConsumers; i++) {
+//            con = createConnection("cli" + i);
+//            session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//            session.createDurableSubscriber(topic, "SubsId", null, true);
+//            session.close();
+//            con.close();
+//        }
+//
+//        class CheckForDupsClient implements Runnable {
+//            HashSet<Long> ids = new HashSet<Long>();
+//            final int id;
+//
+//            public CheckForDupsClient(int id) {
+//                this.id = id;
+//            }
+//
+//            @Override
+//            public void run() {
+//                try {
+//                    Connection con = createConnection("cli" + id);
+//                    Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//                    for (int j=0;j<2;j++) {
+//                        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+//                        for (int i = 0; i < messageCount/2; i++) {
+//                            Message message = consumer.receive(4000);
+//                            assertNotNull(message);
+//                            long producerSequenceId = new MessageId(message.getJMSMessageID()).getProducerSequenceId();
+//                            assertTrue("ID=" + id + " not a duplicate: " + producerSequenceId, ids.add(producerSequenceId));
+//                        }
+//                        consumer.close();
+//                    }
+//
+//                    // verify no duplicates left
+//                    MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+//                    Message message = consumer.receive(4000);
+//                    if (message != null) {
+//                        long producerSequenceId = new MessageId(message.getJMSMessageID()).getProducerSequenceId();
+//                        assertTrue("ID=" + id + " not a duplicate: " + producerSequenceId, ids.add(producerSequenceId));
+//                    }
+//                    assertNull(message);
+//
+//
+//                    session.close();
+//                    con.close();
+//                } catch (Throwable e) {
+//                    e.printStackTrace();
+//                    exceptions.add(e);
+//                }
+//            }
+//        }
+//
+//        final String payLoad = new String(new byte[1000]);
+//        con = createConnection();
+//        final Session sendSession = con.createSession(true, Session.SESSION_TRANSACTED);
+//        MessageProducer producer = sendSession.createProducer(topic);
+//        for (int i = 0; i < messageCount; i++) {
+//            producer.send(sendSession.createTextMessage(payLoad));
+//        }
+//
+//        ExecutorService executorService = Executors.newCachedThreadPool();
+//
+//        // concurrent commit and activate
+//        executorService.execute(new Runnable() {
+//            @Override
+//            public void run() {
+//                try {
+//                    sendSession.commit();
+//                } catch (JMSException e) {
+//                    e.printStackTrace();
+//                    exceptions.add(e);
+//                }
+//            }
+//        });
+//        for (int i = 0; i < numConsumers; i++) {
+//            executorService.execute(new CheckForDupsClient(i));
+//        }
+//
+//        executorService.shutdown();
+//        executorService.awaitTermination(5, TimeUnit.MINUTES);
+//        con.close();
+//
+//        assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+//    }
+//
+//    public void testOrderOnActivateDeactivate() throws Exception {
+//        for (int i=0;i<10;i++) {
+//            LOG.info("Iteration: " + i);
+//            doTestOrderOnActivateDeactivate();
+//            broker.stop();
+//            createBroker(true /*deleteAllMessages*/);
+//        }
+//    }
 
     public void doTestOrderOnActivateDeactivate() throws Exception {
         final int messageCount = 1000;
@@ -1229,217 +1223,217 @@ public class DurableSubscriptionOfflineT
         assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
     }
 
-    public void testUnmatchedSubUnsubscribeDeletesAll() throws Exception {
-        // create offline subs 1
-        Connection con = createConnection("offCli1");
-        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        session.close();
-        con.close();
-
-        // send messages
-        con = createConnection();
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(null);
-
-        int filtered = 0;
-        for (int i = 0; i < 10; i++) {
-            boolean filter = (i %2 == 0); //(int) (Math.random() * 2) >= 1;
-            if (filter)
-                filtered++;
-
-            Message message = session.createMessage();
-            message.setStringProperty("filter", filter ? "true" : "false");
-            producer.send(topic, message);
-        }
-
-        LOG.info("sent: " + filtered);
-        Thread.sleep(1 * 1000);
-        session.close();
-        con.close();
-
-        // test offline subs
-        con = createConnection("offCli1");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.unsubscribe("SubsId");
-        session.close();
-        con.close();
-
-        con = createConnection("offCli1");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        Listener listener = new Listener();
-        consumer.setMessageListener(listener);
-
-        Thread.sleep(3 * 1000);
-
-        session.close();
-        con.close();
-
-        assertEquals(0, listener.count);
-    }
-
-    public void testAllConsumed() throws Exception {
-        final String filter = "filter = 'true'";
-        Connection con = createConnection("cli1");
-        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "SubsId", filter, true);
-        session.close();
-        con.close();
-
-        con = createConnection("cli2");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "SubsId", filter, true);
-        session.close();
-        con.close();
-
-        // send messages
-        con = createConnection();
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(null);
-
-        int sent = 0;
-        for (int i = 0; i < 10; i++) {
-            Message message = session.createMessage();
-            message.setStringProperty("filter", "true");
-            producer.send(topic, message);
-            sent++;
-        }
-
-        LOG.info("sent: " + sent);
-        Thread.sleep(1 * 1000);
-        session.close();
-        con.close();
-
-        con = createConnection("cli1");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
-        Listener listener = new Listener();
-        consumer.setMessageListener(listener);
-        Thread.sleep(3 * 1000);
-        session.close();
-        con.close();
-
-        assertEquals(sent, listener.count);
-
-        LOG.info("cli2 pull 2");
-        con = createConnection("cli2");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
-        assertNotNull("got message", consumer.receive(2000));
-        assertNotNull("got message", consumer.receive(2000));
-        session.close();
-        con.close();
-
-        // send messages
-        con = createConnection();
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        producer = session.createProducer(null);
-
-        sent = 0;
-        for (int i = 0; i < 2; i++) {
-            Message message = session.createMessage();
-            message.setStringProperty("filter", i==1 ? "true" : "false");
-            producer.send(topic, message);
-            sent++;
-        }
-        LOG.info("sent: " + sent);
-        Thread.sleep(1 * 1000);
-        session.close();
-        con.close();
-
-        LOG.info("cli1 again, should get 1 new ones");
-        con = createConnection("cli1");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
-        listener = new Listener();
-        consumer.setMessageListener(listener);
-        Thread.sleep(3 * 1000);
-        session.close();
-        con.close();
-
-        assertEquals(1, listener.count);
-    }
-
-    // https://issues.apache.org/jira/browse/AMQ-3190
-    public void testNoMissOnMatchingSubAfterRestart() throws Exception {
-
-        final String filter = "filter = 'true'";
-        Connection con = createConnection("cli1");
-        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "SubsId", filter, true);
-        session.close();
-        con.close();
-
-        // send unmatched messages
-        con = createConnection();
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(null);
-
-        int sent = 0;
-        // message for cli1 to keep it interested
-        Message message = session.createMessage();
-        message.setStringProperty("filter", "true");
-        message.setIntProperty("ID", 0);
-        producer.send(topic, message);
-        sent++;
-
-        for (int i = sent; i < 10; i++) {
-            message = session.createMessage();
-            message.setStringProperty("filter", "false");
-            message.setIntProperty("ID", i);
-            producer.send(topic, message);
-            sent++;
-        }
-        con.close();
-        LOG.info("sent: " + sent);
-
-        // new sub at id 10
-        con = createConnection("cli2");

[... 590 lines stripped ...]