You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ke...@apache.org on 2013/12/19 14:37:21 UTC

[1/2] For AMQ-4874, broke into multiple parts, and converted to use JUnit4 Parameterized instead of CombinationTestSupport

Updated Branches:
  refs/heads/trunk a64976a37 -> 57f5d49ae


http://git-wip-us.apache.org/repos/asf/activemq/blob/57f5d49a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
index 1473025..15c0627 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
@@ -16,50 +16,32 @@
  */
 package org.apache.activemq.usecases;
 
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.disk.page.PageFile;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
-import javax.management.ObjectName;
-
-import junit.framework.Test;
+import java.util.HashSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
-import org.apache.activemq.broker.jmx.TopicViewMBean;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.store.kahadb.disk.journal.Journal;
-import org.apache.activemq.store.kahadb.disk.page.PageFile;
-import org.apache.activemq.util.Wait;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.junit.Assert.*;
 
-public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupport {
+public class DurableSubscriptionOfflineTest extends DurableSubscriptionOfflineTestBase {
 
     private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOfflineTest.class);
-    public boolean usePrioritySupport = Boolean.TRUE;
-    public int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
-    public boolean keepDurableSubsActive = true;
-    private BrokerService broker;
-    private ActiveMQTopic topic;
-    private final List<Throwable> exceptions = new ArrayList<Throwable>();
 
     @Override
     protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
@@ -68,84 +50,8 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
         return connectionFactory;
     }
 
-    @Override
-    protected Connection createConnection() throws Exception {
-        return createConnection("cliName");
-    }
-
-    protected Connection createConnection(String name) throws Exception {
-        Connection con = super.createConnection();
-        con.setClientID(name);
-        con.start();
-        return con;
-    }
-
-    public static Test suite() {
-        return suite(DurableSubscriptionOfflineTest.class);
-    }
-
-    @Override
-    protected void setUp() throws Exception {
-        setAutoFail(true);
-        setMaxTestTime(2 * 60 * 1000);
-        exceptions.clear();
-        topic = (ActiveMQTopic) createDestination();
-        createBroker();
-        super.setUp();
-    }
-
-    @Override
-    protected void tearDown() throws Exception {
-        super.tearDown();
-        destroyBroker();
-    }
-
-    private void createBroker() throws Exception {
-        createBroker(true);
-    }
-
-    private void createBroker(boolean deleteAllMessages) throws Exception {
-        broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) +")");
-        broker.setBrokerName(getName(true));
-        broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
-        broker.getManagementContext().setCreateConnector(false);
-        broker.setAdvisorySupport(false);
-        broker.setKeepDurableSubsActive(keepDurableSubsActive);
-        broker.addConnector("tcp://0.0.0.0:0");
-
-        if (usePrioritySupport) {
-            PolicyEntry policy = new PolicyEntry();
-            policy.setPrioritizedMessages(true);
-            PolicyMap policyMap = new PolicyMap();
-            policyMap.setDefaultEntry(policy);
-            broker.setDestinationPolicy(policyMap);
-        }
-
-        setDefaultPersistenceAdapter(broker);
-        if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) {
-            // ensure it kicks in during tests
-            ((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).setCleanupPeriod(2*1000);
-        } else if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
-            // have lots of journal files
-            ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setJournalMaxFileLength(journalMaxFileLength);
-        }
-        broker.start();
-        broker.waitUntilStarted();
-    }
-
-    private void destroyBroker() throws Exception {
-        if (broker != null)
-            broker.stop();
-    }
-
-    public void initCombosForTestConsumeOnlyMatchedMessages() throws Exception {
-        this.addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
-        this.addCombinationValues("usePrioritySupport",
-                new Object[]{ Boolean.TRUE, Boolean.FALSE});
-    }
-
-    public void testConsumeOnlyMatchedMessages() throws Exception {
+    @Test(timeout = 60 * 1000)
+    public void testConsumeAllMatchedMessages() throws Exception {
         // create durable subscription
         Connection con = createConnection();
         Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -160,15 +66,14 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
 
         int sent = 0;
         for (int i = 0; i < 10; i++) {
-            boolean filter = i % 2 == 1;
-            if (filter)
-                sent++;
-
+            sent++;
             Message message = session.createMessage();
-            message.setStringProperty("filter", filter ? "true" : "false");
+            message.setStringProperty("filter", "true");
             producer.send(topic, message);
         }
 
+        Thread.sleep(1 * 1000);
+
         session.close();
         con.close();
 
@@ -176,7 +81,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
         con = createConnection();
         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        Listener listener = new Listener();
+        DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
         consumer.setMessageListener(listener);
 
         Thread.sleep(3 * 1000);
@@ -187,110 +92,8 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
         assertEquals(sent, listener.count);
     }
 
-     public void testConsumeAllMatchedMessages() throws Exception {
-         // create durable subscription
-         Connection con = createConnection();
-         Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-         session.close();
-         con.close();
-
-         // send messages
-         con = createConnection();
-         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageProducer producer = session.createProducer(null);
-
-         int sent = 0;
-         for (int i = 0; i < 10; i++) {
-             sent++;
-             Message message = session.createMessage();
-             message.setStringProperty("filter", "true");
-             producer.send(topic, message);
-         }
-
-         Thread.sleep(1 * 1000);
-
-         session.close();
-         con.close();
-
-         // consume messages
-         con = createConnection();
-         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-         Listener listener = new Listener();
-         consumer.setMessageListener(listener);
-
-         Thread.sleep(3 * 1000);
-
-         session.close();
-         con.close();
-
-         assertEquals(sent, listener.count);
-     }
-
-    public void initCombosForTestVerifyAllConsumedAreAcked() throws Exception {
-        this.addCombinationValues("defaultPersistenceAdapter",
-               new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
-        this.addCombinationValues("usePrioritySupport",
-                new Object[]{ Boolean.TRUE, Boolean.FALSE});
-    }
-
-     public void testVerifyAllConsumedAreAcked() throws Exception {
-         // create durable subscription
-         Connection con = createConnection();
-         Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-         session.close();
-         con.close();
-
-         // send messages
-         con = createConnection();
-         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageProducer producer = session.createProducer(null);
-
-         int sent = 0;
-         for (int i = 0; i < 10; i++) {
-             sent++;
-             Message message = session.createMessage();
-             message.setStringProperty("filter", "true");
-             producer.send(topic, message);
-         }
-
-         Thread.sleep(1 * 1000);
-
-         session.close();
-         con.close();
-
-         // consume messages
-         con = createConnection();
-         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-         Listener listener = new Listener();
-         consumer.setMessageListener(listener);
-
-         Thread.sleep(3 * 1000);
-
-         session.close();
-         con.close();
-
-         LOG.info("Consumed: " + listener.count);
-         assertEquals(sent, listener.count);
-
-         // consume messages again, should not get any
-         con = createConnection();
-         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-         listener = new Listener();
-         consumer.setMessageListener(listener);
-
-         Thread.sleep(3 * 1000);
-
-         session.close();
-         con.close();
-
-         assertEquals(0, listener.count);
-     }
 
+    @Test(timeout = 60 * 1000)
     public void testTwoOfflineSubscriptionCanConsume() throws Exception {
         // create durable subscription 1
         Connection con = createConnection("cliId1");
@@ -303,7 +106,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
         Connection con2 = createConnection("cliId2");
         Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        Listener listener2 = new Listener();
+        DurableSubscriptionOfflineTestListener listener2 = new DurableSubscriptionOfflineTestListener();
         consumer2.setMessageListener(listener2);
 
         // send messages
@@ -334,273 +137,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
         con = createConnection("cliId1");
         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        Listener listener = new Listener();
-        consumer.setMessageListener(listener);
-
-        Thread.sleep(3 * 1000);
-
-        session.close();
-        con.close();
-
-        assertEquals("offline consumer got all", sent, listener.count);
-    }
-
-    public void initCombosForTestJMXCountersWithOfflineSubs() throws Exception {
-        this.addCombinationValues("keepDurableSubsActive",
-                new Object[]{Boolean.TRUE, Boolean.FALSE});
-    }
-
-    public void testJMXCountersWithOfflineSubs() throws Exception {
-        // create durable subscription 1
-        Connection con = createConnection("cliId1");
-        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "SubsId", null, true);
-        session.close();
-        con.close();
-
-        // restart broker
-        broker.stop();
-        createBroker(false /*deleteAllMessages*/);
-
-        // send messages
-        con = createConnection();
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(null);
-
-        int sent = 0;
-        for (int i = 0; i < 10; i++) {
-            sent++;
-            Message message = session.createMessage();
-            producer.send(topic, message);
-        }
-        session.close();
-        con.close();
-
-        // consume some messages
-        con = createConnection("cliId1");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
-
-        for (int i=0; i<sent/2; i++) {
-            Message m =  consumer.receive(4000);
-            assertNotNull("got message: " + i, m);
-            LOG.info("Got :" + i + ", " + m);
-        }
-
-        // check some counters while active
-        ObjectName activeDurableSubName = broker.getAdminView().getDurableTopicSubscribers()[0];
-        LOG.info("active durable sub name: " + activeDurableSubName);
-        final DurableSubscriptionViewMBean durableSubscriptionView = (DurableSubscriptionViewMBean)
-                broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true);
-
-        assertTrue("is active", durableSubscriptionView.isActive());
-        assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView.getEnqueueCounter());
-        assertTrue("correct waiting acks", Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return 5 == durableSubscriptionView.getMessageCountAwaitingAcknowledge();
-            }
-        }));
-        assertEquals("correct dequeue", 5, durableSubscriptionView.getDequeueCounter());
-
-
-        ObjectName destinationName = broker.getAdminView().getTopics()[0];
-        TopicViewMBean topicView = (TopicViewMBean) broker.getManagementContext().newProxyInstance(destinationName, TopicViewMBean.class, true);
-        assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
-        assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
-        assertEquals("inflight", 5, topicView.getInFlightCount());
-
-        session.close();
-        con.close();
-
-        // check some counters when inactive
-        ObjectName inActiveDurableSubName = broker.getAdminView().getInactiveDurableTopicSubscribers()[0];
-        LOG.info("inactive durable sub name: " + inActiveDurableSubName);
-        DurableSubscriptionViewMBean durableSubscriptionView1 = (DurableSubscriptionViewMBean)
-                broker.getManagementContext().newProxyInstance(inActiveDurableSubName, DurableSubscriptionViewMBean.class, true);
-
-        assertTrue("is not active", !durableSubscriptionView1.isActive());
-        assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView1.getEnqueueCounter());
-        assertEquals("correct awaiting ack", 0, durableSubscriptionView1.getMessageCountAwaitingAcknowledge());
-        assertEquals("correct dequeue", keepDurableSubsActive ? 5 : 0, durableSubscriptionView1.getDequeueCounter());
-
-        // destination view
-        assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
-        assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
-        assertEquals("inflight back to 0 after deactivate", 0, topicView.getInFlightCount());
-
-        // consume the rest
-        con = createConnection("cliId1");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
-
-        for (int i=0; i<sent/2;i++) {
-            Message m =  consumer.receive(30000);
-            assertNotNull("got message: " + i, m);
-            LOG.info("Got :" + i + ", " + m);
-        }
-
-        activeDurableSubName = broker.getAdminView().getDurableTopicSubscribers()[0];
-        LOG.info("durable sub name: " + activeDurableSubName);
-        final DurableSubscriptionViewMBean durableSubscriptionView2 = (DurableSubscriptionViewMBean)
-                broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true);
-
-        assertTrue("is active", durableSubscriptionView2.isActive());
-        assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView2.getEnqueueCounter());
-        assertTrue("correct dequeue", Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                long val = durableSubscriptionView2.getDequeueCounter();
-                LOG.info("dequeue count:" + val);
-                return 10 == val;
-            }
-        }));
-    }
-
-    public void initCombosForTestOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
-        this.addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
-        this.addCombinationValues("usePrioritySupport",
-                new Object[]{ Boolean.TRUE, Boolean.FALSE});
-    }
-
-    public void testOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
-        Connection con = createConnection("offCli1");
-        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        session.close();
-        con.close();
-
-        con = createConnection("offCli2");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        session.close();
-        con.close();
-
-        Connection con2 = createConnection("onlineCli1");
-        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        Listener listener2 = new Listener();
-        consumer2.setMessageListener(listener2);
-
-        // send messages
-        con = createConnection();
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(null);
-
-        int sent = 0;
-        for (int i = 0; i < 10; i++) {
-            sent++;
-            Message message = session.createMessage();
-            message.setStringProperty("filter", "true");
-            producer.send(topic, message);
-        }
-
-        Thread.sleep(1 * 1000);
-        session.close();
-        con.close();
-
-        // test online subs
-        Thread.sleep(3 * 1000);
-        session2.close();
-        con2.close();
-        assertEquals(sent, listener2.count);
-
-        // restart broker
-        broker.stop();
-        createBroker(false /*deleteAllMessages*/);
-
-        // test offline
-        con = createConnection("offCli1");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-
-        Connection con3 = createConnection("offCli2");
-        Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-
-        Listener listener = new Listener();
-        consumer.setMessageListener(listener);
-        Listener listener3 = new Listener();
-        consumer3.setMessageListener(listener3);
-
-        Thread.sleep(3 * 1000);
-
-        session.close();
-        con.close();
-        session3.close();
-        con3.close();
-
-        assertEquals(sent, listener.count);
-        assertEquals(sent, listener3.count);
-    }
-
-    public void initCombosForTestInterleavedOfflineSubscriptionCanConsume() throws Exception {
-        this.addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
-    }
-
-    public void testInterleavedOfflineSubscriptionCanConsume() throws Exception {
-        // create durable subscription 1
-        Connection con = createConnection("cliId1");
-        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        session.close();
-        con.close();
-
-        // send messages
-        con = createConnection();
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(null);
-
-        int sent = 0;
-        for (int i = 0; i < 10; i++) {
-            sent++;
-            Message message = session.createMessage();
-            message.setStringProperty("filter", "true");
-            producer.send(topic, message);
-        }
-
-        Thread.sleep(1 * 1000);
-
-        // create durable subscription 2
-        Connection con2 = createConnection("cliId2");
-        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        Listener listener2 = new Listener();
-        consumer2.setMessageListener(listener2);
-
-        assertEquals(0, listener2.count);
-        session2.close();
-        con2.close();
-
-        // send some more
-        for (int i = 0; i < 10; i++) {
-            sent++;
-            Message message = session.createMessage();
-            message.setStringProperty("filter", "true");
-            producer.send(topic, message);
-        }
-
-        Thread.sleep(1 * 1000);
-        session.close();
-        con.close();
-
-        con2 = createConnection("cliId2");
-        session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        listener2 = new Listener("cliId2");
-        consumer2.setMessageListener(listener2);
-        // test online subs
-        Thread.sleep(3 * 1000);
-
-        assertEquals(10, listener2.count);
-
-        // consume all messages
-        con = createConnection("cliId1");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        Listener listener = new Listener("cliId1");
+        DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
         consumer.setMessageListener(listener);
 
         Thread.sleep(3 * 1000);
@@ -611,117 +148,9 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
         assertEquals("offline consumer got all", sent, listener.count);
     }
 
-    public void initCombosForTestMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
-        this.addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
-    }
-
-    private static String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))";
-    public void testMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
-        // create offline subs 1
-        Connection con = createConnection("offCli1");
-        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "SubsId", filter, true);
-        session.close();
-        con.close();
-
-        // create offline subs 2
-        con = createConnection("offCli2");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "SubsId", filter, true);
-        session.close();
-        con.close();
-
-        // create online subs
-        Connection con2 = createConnection("onlineCli1");
-        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", filter, true);
-        Listener listener2 = new Listener();
-        consumer2.setMessageListener(listener2);
-
-        // create non-durable consumer
-        Connection con4 = createConnection("nondurableCli");
-        Session session4 = con4.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer4 = session4.createConsumer(topic, filter, true);
-        Listener listener4 = new Listener();
-        consumer4.setMessageListener(listener4);
-
-        // send messages
-        con = createConnection();
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(null);
-
-        boolean hasRelevant = false;
-        int filtered = 0;
-        for (int i = 0; i < 100; i++) {
-            int postf = (int) (Math.random() * 9) + 1;
-            String d = "D" + postf;
-
-            if ("D1".equals(d) || "D2".equals(d)) {
-                hasRelevant = true;
-                filtered++;
-            }
-
-            Message message = session.createMessage();
-            message.setStringProperty("$a", "A1");
-            message.setStringProperty("$d", d);
-            producer.send(topic, message);
-        }
-
-        Message message = session.createMessage();
-        message.setStringProperty("$a", "A1");
-        message.setBooleanProperty("$b", true);
-        message.setBooleanProperty("$c", hasRelevant);
-        producer.send(topic, message);
-
-        if (hasRelevant)
-            filtered++;
-
-        Thread.sleep(1 * 1000);
-        session.close();
-        con.close();
-
-        Thread.sleep(3 * 1000);
-
-        // test non-durable consumer
-        session4.close();
-        con4.close();
-        assertEquals(filtered, listener4.count); // succeeded!
-
-        // test online subs
-        session2.close();
-        con2.close();
-        assertEquals(filtered, listener2.count); // succeeded!
-
-        // test offline 1
-        con = createConnection("offCli1");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
-        Listener listener = new FilterCheckListener();
-        consumer.setMessageListener(listener);
-
-        Thread.sleep(3 * 1000);
-        session.close();
-        con.close();
-
-        assertEquals(filtered, listener.count);
-
-        // test offline 2
-        Connection con3 = createConnection("offCli2");
-        Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", filter, true);
-        Listener listener3 = new FilterCheckListener();
-        consumer3.setMessageListener(listener3);
-
-        Thread.sleep(3 * 1000);
-        session3.close();
-        con3.close();
-
-        assertEquals(filtered, listener3.count);
-        assertTrue("no unexpected exceptions: " + exceptions, exceptions.isEmpty());
-    }
-
+    @Test(timeout = 60 * 1000)
     public void testRemovedDurableSubDeletes() throws Exception {
+        String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))";
         // create durable subscription 1
         Connection con = createConnection("cliId1");
         Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -753,13 +182,14 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
         con = createConnection("offCli2");
         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
-        Listener listener = new Listener();
+        DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
         consumer.setMessageListener(listener);
         session.close();
         con.close();
         assertEquals(0, listener.count);
     }
 
+    @Test(timeout = 60 * 1000)
     public void testRemovedDurableSubDeletesFromIndex() throws Exception {
 
         if (! (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter)) {
@@ -810,169 +240,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
         }
     }
 
-    public void initCombosForTestOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
-        this.addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
-    }
-
-    public void testOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
-
-        if (PersistenceAdapterChoice.LevelDB == defaultPersistenceAdapter) {
-            // https://issues.apache.org/jira/browse/AMQ-4296
-            return;
-        }
-
-        // create offline subs 1
-        Connection con = createConnection("offCli1");
-        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        session.close();
-        con.close();
-
-        // create offline subs 2
-        con = createConnection("offCli2");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        session.close();
-        con.close();
-
-        // send messages
-        con = createConnection();
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(null);
-
-        int filtered = 0;
-        for (int i = 0; i < 10; i++) {
-            boolean filter = (int) (Math.random() * 2) >= 1;
-            if (filter)
-                filtered++;
-
-            Message message = session.createMessage();
-            message.setStringProperty("filter", filter ? "true" : "false");
-            producer.send(topic, message);
-        }
-
-        LOG.info("sent: " + filtered);
-        Thread.sleep(1 * 1000);
-        session.close();
-        con.close();
-
-        // restart broker
-        Thread.sleep(3 * 1000);
-        broker.stop();
-        createBroker(false /*deleteAllMessages*/);
-
-        // send more messages
-        con = createConnection();
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        producer = session.createProducer(null);
-
-        for (int i = 0; i < 10; i++) {
-            boolean filter = (int) (Math.random() * 2) >= 1;
-            if (filter)
-                filtered++;
-
-            Message message = session.createMessage();
-            message.setStringProperty("filter", filter ? "true" : "false");
-            producer.send(topic, message);
-        }
-
-        LOG.info("after restart, total sent with filter='true': " + filtered);
-        Thread.sleep(1 * 1000);
-        session.close();
-        con.close();
-
-        // test offline subs
-        con = createConnection("offCli1");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        Listener listener = new Listener("1>");
-        consumer.setMessageListener(listener);
-
-        Connection con3 = createConnection("offCli2");
-        Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        Listener listener3 = new Listener();
-        consumer3.setMessageListener(listener3);
-
-        Thread.sleep(3 * 1000);
-
-        session.close();
-        con.close();
-        session3.close();
-        con3.close();
-
-        assertEquals(filtered, listener.count);
-        assertEquals(filtered, listener3.count);
-    }
-
-    public void initCombosForTestOfflineSubscriptionAfterRestart() throws Exception {
-        this.addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
-    }
-
-    public void testOfflineSubscriptionAfterRestart() throws Exception {
-        // create offline subs 1
-        Connection con = createConnection("offCli1");
-        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, false);
-        Listener listener = new Listener();
-        consumer.setMessageListener(listener);
-
-        // send messages
-        MessageProducer producer = session.createProducer(null);
-
-        int sent = 0;
-        for (int i = 0; i < 10; i++) {
-            sent++;
-            Message message = session.createMessage();
-            message.setStringProperty("filter", "false");
-            producer.send(topic, message);
-        }
-
-        LOG.info("sent: " + sent);
-        Thread.sleep(5 * 1000);
-        session.close();
-        con.close();
-
-        assertEquals(sent, listener.count);
-
-        // restart broker
-        Thread.sleep(3 * 1000);
-        broker.stop();
-        createBroker(false /*deleteAllMessages*/);
-
-        // send more messages
-        con = createConnection();
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        producer = session.createProducer(null);
-
-        for (int i = 0; i < 10; i++) {
-            sent++;
-            Message message = session.createMessage();
-            message.setStringProperty("filter", "false");
-            producer.send(topic, message);
-        }
-
-        LOG.info("after restart, sent: " + sent);
-        Thread.sleep(1 * 1000);
-        session.close();
-        con.close();
-
-        // test offline subs
-        con = createConnection("offCli1");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
-        consumer.setMessageListener(listener);
-
-        Thread.sleep(3 * 1000);
-
-        session.close();
-        con.close();
-
-        assertEquals(sent, listener.count);
-    }
-
+    @Test(timeout = 60 * 1000)
     public void testInterleavedOfflineSubscriptionCanConsumeAfterUnsub() throws Exception {
         // create offline subs 1
         Connection con = createConnection("offCli1");
@@ -1016,7 +284,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
         con = createConnection("offCli2");
         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
-        Listener listener = new Listener("SubsId");
+        DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener("SubsId");
         consumer.setMessageListener(listener);
 
         Thread.sleep(3 * 1000);
@@ -1027,6 +295,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
         assertEquals("offline consumer got all", sent, listener.count);
     }
 
+    @Test(timeout = 60 * 1000)
     public void testNoDuplicateOnConcurrentSendTranCommitAndActivate() throws Exception {
         final int messageCount = 1000;
         Connection con = null;
@@ -1117,10 +386,9 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
     }
 
 
-    /*
-     * Ignoring for now, see https://issues.apache.org/jira/browse/AMQ-4874
-     */
-    public void XXXtestOrderOnActivateDeactivate() throws Exception {
+    @Ignore("see https://issues.apache.org/jira/browse/AMQ-4874")
+    @Test(timeout = 60 * 1000)
+    public void testOrderOnActivateDeactivate() throws Exception {
         for (int i=0;i<10;i++) {
             LOG.info("Iteration: " + i);
             doTestOrderOnActivateDeactivate();
@@ -1145,13 +413,13 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
         }
 
         final String url = "failover:(tcp://localhost:"
-            + (broker.getTransportConnectors().get(1).getConnectUri()).getPort()
-            + "?wireFormat.maxInactivityDuration=0)?"
-            + "jms.watchTopicAdvisories=false&"
-            + "jms.alwaysSyncSend=true&jms.dispatchAsync=true&"
-            + "jms.sendAcksAsync=true&"
-            + "initialReconnectDelay=100&maxReconnectDelay=30000&"
-            + "useExponentialBackOff=true";
+                + (broker.getTransportConnectors().get(1).getConnectUri()).getPort()
+                + "?wireFormat.maxInactivityDuration=0)?"
+                + "jms.watchTopicAdvisories=false&"
+                + "jms.alwaysSyncSend=true&jms.dispatchAsync=true&"
+                + "jms.sendAcksAsync=true&"
+                + "initialReconnectDelay=100&maxReconnectDelay=30000&"
+                + "useExponentialBackOff=true";
         final ActiveMQConnectionFactory clientFactory = new ActiveMQConnectionFactory(url);
 
         class CheckOrderClient implements Runnable {
@@ -1235,6 +503,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
         assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
     }
 
+    @Test(timeout = 60 * 1000)
     public void testUnmatchedSubUnsubscribeDeletesAll() throws Exception {
         // create offline subs 1
         Connection con = createConnection("offCli1");
@@ -1274,7 +543,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
         con = createConnection("offCli1");
         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        Listener listener = new Listener();
+        DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
         consumer.setMessageListener(listener);
 
         Thread.sleep(3 * 1000);
@@ -1285,6 +554,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
         assertEquals(0, listener.count);
     }
 
+    @Test(timeout = 60 * 1000)
     public void testAllConsumed() throws Exception {
         final String filter = "filter = 'true'";
         Connection con = createConnection("cli1");
@@ -1320,7 +590,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
         con = createConnection("cli1");
         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
-        Listener listener = new Listener();
+        DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
         consumer.setMessageListener(listener);
         Thread.sleep(3 * 1000);
         session.close();
@@ -1358,7 +628,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
         con = createConnection("cli1");
         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
         consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
-        listener = new Listener();
+        listener = new DurableSubscriptionOfflineTestListener();
         consumer.setMessageListener(listener);
         Thread.sleep(3 * 1000);
         session.close();
@@ -1368,6 +638,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
     }
 
     // https://issues.apache.org/jira/browse/AMQ-3190
+    @Test(timeout = 60 * 1000)
     public void testNoMissOnMatchingSubAfterRestart() throws Exception {
 
         final String filter = "filter = 'true'";
@@ -1447,80 +718,6 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
         con.close();
     }
 
-    // use very small journal to get lots of files to cleanup
-    public void initCombosForTestCleanupDeletedSubAfterRestart() throws Exception {
-        this.addCombinationValues("journalMaxFileLength",
-                new Object[]{new Integer(64 * 1024)});
-        this.addCombinationValues("keepDurableSubsActive",
-                new Object[]{Boolean.TRUE, Boolean.FALSE});
-    }
-
-    // https://issues.apache.org/jira/browse/AMQ-3206
-    public void testCleanupDeletedSubAfterRestart() throws Exception {
-        Connection con = createConnection("cli1");
-        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "SubsId", null, true);
-        session.close();
-        con.close();
-
-        con = createConnection("cli2");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "SubsId", null, true);
-        session.close();
-        con.close();
-
-        con = createConnection();
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(null);
-
-        final int toSend = 500;
-        final String payload = new byte[40*1024].toString();
-        int sent = 0;
-        for (int i = sent; i < toSend; i++) {
-            Message message = session.createTextMessage(payload);
-            message.setStringProperty("filter", "false");
-            message.setIntProperty("ID", i);
-            producer.send(topic, message);
-            sent++;
-        }
-        con.close();
-        LOG.info("sent: " + sent);
-
-        // kill off cli1
-        con = createConnection("cli1");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.unsubscribe("SubsId");
-
-        destroyBroker();
-        createBroker(false);
-
-        con = createConnection("cli2");
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
-        final Listener listener = new Listener();
-        consumer.setMessageListener(listener);
-        assertTrue("got all sent", Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                LOG.info("Want: " + toSend  + ", current: " + listener.count);
-                return listener.count == toSend;
-            }
-        }));
-        session.close();
-        con.close();
-
-        destroyBroker();
-        createBroker(false);
-        final KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
-        assertTrue("Should have less than three journal files left but was: " +
-            pa.getStore().getJournal().getFileMap().size(), Wait.waitFor(new Wait.Condition() {
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return pa.getStore().getJournal().getFileMap().size() <= 3;
-            }
-        }));
-    }
 
 //    // https://issues.apache.org/jira/browse/AMQ-3768
 //    public void testPageReuse() throws Exception {
@@ -1679,46 +876,4 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
 //        assertTrue("No exceptions expected, but was: " + exceptions, exceptions.isEmpty());
 //    }
 
-    public static class Listener implements MessageListener {
-        int count = 0;
-        String id = null;
-
-        Listener() {
-        }
-        Listener(String id) {
-            this.id = id;
-        }
-        @Override
-        public void onMessage(Message message) {
-            count++;
-            if (id != null) {
-                try {
-                    LOG.info(id + ", " + message.getJMSMessageID());
-                } catch (Exception ignored) {}
-            }
-        }
-    }
-
-    public class FilterCheckListener extends Listener  {
-
-        @Override
-        public void onMessage(Message message) {
-            count++;
-
-            try {
-                Object b = message.getObjectProperty("$b");
-                if (b != null) {
-                    boolean c = message.getBooleanProperty("$c");
-                    assertTrue("", c);
-                } else {
-                    String d = message.getStringProperty("$d");
-                    assertTrue("", "D1".equals(d) || "D2".equals(d));
-                }
-            }
-            catch (JMSException e) {
-                e.printStackTrace();
-                exceptions.add(e);
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/57f5d49a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTestBase.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTestBase.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTestBase.java
new file mode 100644
index 0000000..3bffb4e
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTestBase.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.TestSupport.PersistenceAdapterChoice;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.disk.journal.Journal;
+import org.apache.activemq.store.leveldb.LevelDBPersistenceAdapter;
+import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageListener;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public abstract class DurableSubscriptionOfflineTestBase {
+    private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOfflineTestBase.class);
+    public boolean usePrioritySupport = Boolean.TRUE;
+    public int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
+    public boolean keepDurableSubsActive = true;
+    protected BrokerService broker;
+    protected ActiveMQTopic topic;
+    protected final List<Throwable> exceptions = new ArrayList<Throwable>();
+    protected ActiveMQConnectionFactory connectionFactory;
+    protected boolean isTopic = true;
+    public PersistenceAdapterChoice defaultPersistenceAdapter = PersistenceAdapterChoice.KahaDB;
+
+    @Rule
+    public TestName testName = new TestName();
+
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true));
+        connectionFactory.setWatchTopicAdvisories(false);
+        return connectionFactory;
+    }
+
+    protected Connection createConnection() throws Exception {
+        return createConnection("cliName");
+    }
+
+    protected Connection createConnection(String name) throws Exception {
+        ConnectionFactory connectionFactory1 = createConnectionFactory();
+        Connection connection = connectionFactory1.createConnection();
+        connection.setClientID(name);
+        connection.start();
+        return connection;
+    }
+
+    public ActiveMQConnectionFactory getConnectionFactory() throws Exception {
+        if (connectionFactory == null) {
+            connectionFactory = createConnectionFactory();
+            assertTrue("Should have created a connection factory!", connectionFactory != null);
+        }
+        return connectionFactory;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        exceptions.clear();
+        topic = (ActiveMQTopic) createDestination();
+        createBroker();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        destroyBroker();
+    }
+
+    protected void createBroker() throws Exception {
+        createBroker(true);
+    }
+
+    protected void createBroker(boolean deleteAllMessages) throws Exception {
+        String currentTestName = getName(true);
+        broker = BrokerFactory.createBroker("broker:(vm://" + currentTestName +")");
+        broker.setBrokerName(currentTestName);
+        broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        broker.getManagementContext().setCreateConnector(false);
+        broker.setAdvisorySupport(false);
+        broker.setKeepDurableSubsActive(keepDurableSubsActive);
+        broker.addConnector("tcp://0.0.0.0:0");
+
+        if (usePrioritySupport) {
+            PolicyEntry policy = new PolicyEntry();
+            policy.setPrioritizedMessages(true);
+            PolicyMap policyMap = new PolicyMap();
+            policyMap.setDefaultEntry(policy);
+            broker.setDestinationPolicy(policyMap);
+        }
+
+        setDefaultPersistenceAdapter(broker);
+        if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) {
+            // ensure it kicks in during tests
+            ((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).setCleanupPeriod(2*1000);
+        } else if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
+            // have lots of journal files
+            ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setJournalMaxFileLength(journalMaxFileLength);
+        }
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    protected void destroyBroker() throws Exception {
+        if (broker != null)
+            broker.stop();
+    }
+
+    protected Destination createDestination(String subject) {
+        if (isTopic) {
+            return new ActiveMQTopic(subject);
+        } else {
+            return new ActiveMQQueue(subject);
+        }
+    }
+
+    protected Destination createDestination() {
+        return createDestination(getDestinationString());
+    }
+
+    /**
+     * Returns the name of the destination used in this test case
+     */
+    protected String getDestinationString() {
+        return getClass().getName() + "." + getName(true);
+    }
+
+
+    public String getName() {
+        return getName(false);
+    }
+
+    protected String getName(boolean original) {
+        String currentTestName = testName.getMethodName();
+        currentTestName = currentTestName.replace("[","");
+        currentTestName = currentTestName.replace("]","");
+        return currentTestName;
+    }
+
+    public PersistenceAdapter setDefaultPersistenceAdapter(BrokerService broker) throws IOException {
+        return setPersistenceAdapter(broker, defaultPersistenceAdapter);
+    }
+
+    public PersistenceAdapter setPersistenceAdapter(BrokerService broker, PersistenceAdapterChoice choice) throws IOException {
+        PersistenceAdapter adapter = null;
+        switch (choice) {
+            case JDBC:
+                LOG.debug(">>>> setPersistenceAdapter to JDBC ");
+                adapter = new JDBCPersistenceAdapter();
+                break;
+            case KahaDB:
+                LOG.debug(">>>> setPersistenceAdapter to KahaDB ");
+                adapter = new KahaDBPersistenceAdapter();
+                break;
+            case LevelDB:
+                LOG.debug(">>>> setPersistenceAdapter to LevelDB ");
+                adapter = new LevelDBPersistenceAdapter();
+                break;
+            case MEM:
+                LOG.debug(">>>> setPersistenceAdapter to MEM ");
+                adapter = new MemoryPersistenceAdapter();
+                break;
+        }
+        broker.setPersistenceAdapter(adapter);
+        return adapter;
+    }
+}
+
+class DurableSubscriptionOfflineTestListener implements MessageListener {
+    private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOfflineTestListener.class);
+    int count = 0;
+    String id = null;
+
+    DurableSubscriptionOfflineTestListener() {}
+
+    DurableSubscriptionOfflineTestListener(String id) {
+        this.id = id;
+    }
+    @Override
+    public void onMessage(javax.jms.Message message) {
+        count++;
+        if (id != null) {
+            try {
+                LOG.info(id + ", " + message.getJMSMessageID());
+            } catch (Exception ignored) {}
+        }
+    }
+}
\ No newline at end of file


[2/2] git commit: For AMQ-4874, broke into multiple parts, and converted to use JUnit4 Parameterized instead of CombinationTestSupport

Posted by ke...@apache.org.
For AMQ-4874, broke into multiple parts, and converted to use JUnit4 Parameterized instead of CombinationTestSupport


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/57f5d49a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/57f5d49a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/57f5d49a

Branch: refs/heads/trunk
Commit: 57f5d49ae9a76f2a3a17fa7c7966130c7a7352fe
Parents: a64976a
Author: Kevin Earls <ke...@kevinearls.com>
Authored: Thu Dec 19 14:37:12 2013 +0100
Committer: Kevin Earls <ke...@kevinearls.com>
Committed: Thu Dec 19 14:37:12 2013 +0100

----------------------------------------------------------------------
 .../DurableSubscriptionOffline1Test.java        | 248 +++++
 .../DurableSubscriptionOffline2Test.java        | 171 ++++
 .../DurableSubscriptionOffline3Test.java        | 424 +++++++++
 .../DurableSubscriptionOffline4Test.java        | 131 +++
 .../DurableSubscriptionOfflineTest.java         | 941 +------------------
 .../DurableSubscriptionOfflineTestBase.java     | 221 +++++
 6 files changed, 1243 insertions(+), 893 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/57f5d49a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline1Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline1Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline1Test.java
new file mode 100644
index 0000000..67745f9
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline1Test.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.TestSupport.PersistenceAdapterChoice;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.actors.threadpool.Arrays;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(value = Parameterized.class)
+public class DurableSubscriptionOffline1Test extends DurableSubscriptionOfflineTestBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOffline1Test.class);
+
+    @Parameterized.Parameters(name = "{0}-{1}")
+    public static Collection<Object[]> getTestParameters() {
+        String osName = System.getProperty("os.name");
+        LOG.debug("Running on [" + osName + "]");
+
+        List<PersistenceAdapterChoice> persistenceAdapterChoices = new ArrayList<PersistenceAdapterChoice>();
+
+        persistenceAdapterChoices.add(PersistenceAdapterChoice.KahaDB);
+        persistenceAdapterChoices.add(PersistenceAdapterChoice.JDBC);
+        if (!osName.equalsIgnoreCase("AIX") && !osName.equalsIgnoreCase("SunOS")) {
+            //choices.add(levelDb);
+            persistenceAdapterChoices.add(PersistenceAdapterChoice.LevelDB);
+        }
+
+        List<Object[]> testParameters = new ArrayList<Object[]>();
+        Boolean[] booleanValues = {Boolean.FALSE, Boolean.TRUE};
+        List<Boolean> booleans = Arrays.asList(booleanValues);
+        for (Boolean booleanValue : booleans) {
+            for (PersistenceAdapterChoice persistenceAdapterChoice : persistenceAdapterChoices) {
+                Object[] currentChoice = {persistenceAdapterChoice, booleanValue};
+                testParameters.add(currentChoice);
+            }
+        }
+
+        return testParameters;
+    }
+
+    public DurableSubscriptionOffline1Test(PersistenceAdapterChoice adapter, Boolean usePrioritySupport) {
+        this.defaultPersistenceAdapter = adapter;
+        this.usePrioritySupport = usePrioritySupport.booleanValue();
+        LOG.debug(">>>> Created with adapter {} usePrioritySupport? {}", defaultPersistenceAdapter, usePrioritySupport);
+
+    }
+
+    @Test
+    public void testConsumeOnlyMatchedMessages() throws Exception {
+        // create durable subscription
+        Connection con = createConnection();
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        session.close();
+        con.close();
+
+        // send messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        int sent = 0;
+        for (int i = 0; i < 10; i++) {
+            boolean filter = i % 2 == 1;
+            if (filter)
+                sent++;
+
+            Message message = session.createMessage();
+            message.setStringProperty("filter", filter ? "true" : "false");
+            producer.send(topic, message);
+        }
+
+        session.close();
+        con.close();
+
+        // consume messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
+        consumer.setMessageListener(listener);
+
+        Thread.sleep(3 * 1000);
+
+        session.close();
+        con.close();
+
+        assertEquals(sent, listener.count);
+    }
+
+    @Test
+    public void testVerifyAllConsumedAreAcked() throws Exception {
+        // create durable subscription
+        Connection con = createConnection();
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        session.close();
+        con.close();
+
+        // send messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        int sent = 0;
+        for (int i = 0; i < 10; i++) {
+            sent++;
+            Message message = session.createMessage();
+            message.setStringProperty("filter", "true");
+            producer.send(topic, message);
+        }
+
+        Thread.sleep(1 * 1000);
+
+        session.close();
+        con.close();
+
+        // consume messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
+        consumer.setMessageListener(listener);
+
+        Thread.sleep(3 * 1000);
+
+        session.close();
+        con.close();
+
+        LOG.info("Consumed: " + listener.count);
+        assertEquals(sent, listener.count);
+
+        // consume messages again, should not get any
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        listener = new DurableSubscriptionOfflineTestListener();
+        consumer.setMessageListener(listener);
+
+        Thread.sleep(3 * 1000);
+
+        session.close();
+        con.close();
+
+        assertEquals(0, listener.count);
+    }
+
+    @Test
+    public void testOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
+        Connection con = createConnection("offCli1");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        session.close();
+        con.close();
+
+        con = createConnection("offCli2");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        session.close();
+        con.close();
+
+        Connection con2 = createConnection("onlineCli1");
+        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        DurableSubscriptionOfflineTestListener listener2 = new DurableSubscriptionOfflineTestListener();
+        consumer2.setMessageListener(listener2);
+
+        // send messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        int sent = 0;
+        for (int i = 0; i < 10; i++) {
+            sent++;
+            Message message = session.createMessage();
+            message.setStringProperty("filter", "true");
+            producer.send(topic, message);
+        }
+
+        Thread.sleep(1 * 1000);
+        session.close();
+        con.close();
+
+        // test online subs
+        Thread.sleep(3 * 1000);
+        session2.close();
+        con2.close();
+        assertEquals(sent, listener2.count);
+
+        // restart broker
+        broker.stop();
+        createBroker(false /*deleteAllMessages*/);
+
+        // test offline
+        con = createConnection("offCli1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+
+        Connection con3 = createConnection("offCli2");
+        Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+
+        DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
+        consumer.setMessageListener(listener);
+        DurableSubscriptionOfflineTestListener listener3 = new DurableSubscriptionOfflineTestListener();
+        consumer3.setMessageListener(listener3);
+
+        Thread.sleep(3 * 1000);
+
+        session.close();
+        con.close();
+        session3.close();
+        con3.close();
+
+        assertEquals(sent, listener.count);
+        assertEquals(sent, listener3.count);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/57f5d49a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java
new file mode 100644
index 0000000..960d9ea
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
+import org.apache.activemq.broker.jmx.TopicViewMBean;
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.ObjectName;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+
+@RunWith(value = Parameterized.class)
+public class DurableSubscriptionOffline2Test extends DurableSubscriptionOfflineTestBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOffline2Test.class);
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection<Boolean[]> getTestParameters() {
+        Boolean[] f = {Boolean.FALSE};
+        Boolean[] t = {Boolean.TRUE};
+        List<Boolean[]> booleanChoices = new ArrayList<Boolean[]>();
+        booleanChoices.add(f);
+        booleanChoices.add(t);
+
+        return booleanChoices;
+    }
+
+    public DurableSubscriptionOffline2Test(Boolean keepDurableSubsActive) {
+        this.keepDurableSubsActive = keepDurableSubsActive.booleanValue();
+
+        LOG.info(">>>> running {} with keepDurableSubsActive: {}", testName.getMethodName(), this.keepDurableSubsActive);
+    }
+
+
+    @Test(timeout = 60 * 1000)
+    public void testJMXCountersWithOfflineSubs() throws Exception {
+        // create durable subscription 1
+        Connection con = createConnection("cliId1");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", null, true);
+        session.close();
+        con.close();
+
+        // restart broker
+        broker.stop();
+        createBroker(false /*deleteAllMessages*/);
+
+        // send messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        int sent = 0;
+        for (int i = 0; i < 10; i++) {
+            sent++;
+            Message message = session.createMessage();
+            producer.send(topic, message);
+        }
+        session.close();
+        con.close();
+
+        // consume some messages
+        con = createConnection("cliId1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+
+        for (int i=0; i<sent/2; i++) {
+            Message m =  consumer.receive(4000);
+            assertNotNull("got message: " + i, m);
+            LOG.info("Got :" + i + ", " + m);
+        }
+
+        // check some counters while active
+        ObjectName activeDurableSubName = broker.getAdminView().getDurableTopicSubscribers()[0];
+        LOG.info("active durable sub name: " + activeDurableSubName);
+        final DurableSubscriptionViewMBean durableSubscriptionView = (DurableSubscriptionViewMBean)
+                broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true);
+
+        assertTrue("is active", durableSubscriptionView.isActive());
+        assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView.getEnqueueCounter());
+        assertTrue("correct waiting acks", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 5 == durableSubscriptionView.getMessageCountAwaitingAcknowledge();
+            }
+        }));
+        assertEquals("correct dequeue", 5, durableSubscriptionView.getDequeueCounter());
+
+
+        ObjectName destinationName = broker.getAdminView().getTopics()[0];
+        TopicViewMBean topicView = (TopicViewMBean) broker.getManagementContext().newProxyInstance(destinationName, TopicViewMBean.class, true);
+        assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
+        assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
+        assertEquals("inflight", 5, topicView.getInFlightCount());
+
+        session.close();
+        con.close();
+
+        // check some counters when inactive
+        ObjectName inActiveDurableSubName = broker.getAdminView().getInactiveDurableTopicSubscribers()[0];
+        LOG.info("inactive durable sub name: " + inActiveDurableSubName);
+        DurableSubscriptionViewMBean durableSubscriptionView1 = (DurableSubscriptionViewMBean)
+                broker.getManagementContext().newProxyInstance(inActiveDurableSubName, DurableSubscriptionViewMBean.class, true);
+
+        assertTrue("is not active", !durableSubscriptionView1.isActive());
+        assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView1.getEnqueueCounter());
+        assertEquals("correct awaiting ack", 0, durableSubscriptionView1.getMessageCountAwaitingAcknowledge());
+        assertEquals("correct dequeue", keepDurableSubsActive ? 5 : 0, durableSubscriptionView1.getDequeueCounter());
+
+        // destination view
+        assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
+        assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
+        assertEquals("inflight back to 0 after deactivate", 0, topicView.getInFlightCount());
+
+        // consume the rest
+        con = createConnection("cliId1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+
+        for (int i=0; i<sent/2;i++) {
+            Message m =  consumer.receive(30000);
+            assertNotNull("got message: " + i, m);
+            LOG.info("Got :" + i + ", " + m);
+        }
+
+        activeDurableSubName = broker.getAdminView().getDurableTopicSubscribers()[0];
+        LOG.info("durable sub name: " + activeDurableSubName);
+        final DurableSubscriptionViewMBean durableSubscriptionView2 = (DurableSubscriptionViewMBean)
+                broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true);
+
+        assertTrue("is active", durableSubscriptionView2.isActive());
+        assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView2.getEnqueueCounter());
+        assertTrue("correct dequeue", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                long val = durableSubscriptionView2.getDequeueCounter();
+                LOG.info("dequeue count:" + val);
+                return 10 == val;
+            }
+        }));
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/57f5d49a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline3Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline3Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline3Test.java
new file mode 100644
index 0000000..c0aee13
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline3Test.java
@@ -0,0 +1,424 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.TestSupport.PersistenceAdapterChoice;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+@RunWith(value = Parameterized.class)
+public class DurableSubscriptionOffline3Test extends DurableSubscriptionOfflineTestBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOffline3Test.class);
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection<PersistenceAdapterChoice[]> getTestParameters() {
+        String osName = System.getProperty("os.name");
+        LOG.debug("Running on [" + osName + "]");
+
+        PersistenceAdapterChoice[] kahaDb = {PersistenceAdapterChoice.KahaDB};
+        PersistenceAdapterChoice[] jdbc = {PersistenceAdapterChoice.JDBC};
+        List<PersistenceAdapterChoice[]> choices = new ArrayList<PersistenceAdapterChoice[]>();
+        choices.add(kahaDb);
+        choices.add(jdbc);
+        if (!osName.equalsIgnoreCase("AIX") && !osName.equalsIgnoreCase("SunOS")) {
+            PersistenceAdapterChoice[] levelDb = {PersistenceAdapterChoice.LevelDB};
+            choices.add(levelDb);
+        }
+
+        return choices;
+    }
+
+    public DurableSubscriptionOffline3Test(PersistenceAdapterChoice persistenceAdapterChoice) {
+        this.defaultPersistenceAdapter = persistenceAdapterChoice;
+
+        LOG.info(">>>> running {} with persistenceAdapterChoice: {}", testName.getMethodName(), this.defaultPersistenceAdapter);
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testInterleavedOfflineSubscriptionCanConsume() throws Exception {
+        // create durable subscription 1
+        Connection con = createConnection("cliId1");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        session.close();
+        con.close();
+
+        // send messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        int sent = 0;
+        for (int i = 0; i < 10; i++) {
+            sent++;
+            Message message = session.createMessage();
+            message.setStringProperty("filter", "true");
+            producer.send(topic, message);
+        }
+
+        Thread.sleep(1 * 1000);
+
+        // create durable subscription 2
+        Connection con2 = createConnection("cliId2");
+        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        DurableSubscriptionOfflineTestListener listener2 = new DurableSubscriptionOfflineTestListener();
+        consumer2.setMessageListener(listener2);
+
+        assertEquals(0, listener2.count);
+        session2.close();
+        con2.close();
+
+        // send some more
+        for (int i = 0; i < 10; i++) {
+            sent++;
+            Message message = session.createMessage();
+            message.setStringProperty("filter", "true");
+            producer.send(topic, message);
+        }
+
+        Thread.sleep(1 * 1000);
+        session.close();
+        con.close();
+
+        con2 = createConnection("cliId2");
+        session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        listener2 = new DurableSubscriptionOfflineTestListener("cliId2");
+        consumer2.setMessageListener(listener2);
+        // test online subs
+        Thread.sleep(3 * 1000);
+
+        assertEquals(10, listener2.count);
+
+        // consume all messages
+        con = createConnection("cliId1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener("cliId1");
+        consumer.setMessageListener(listener);
+
+        Thread.sleep(3 * 1000);
+
+        session.close();
+        con.close();
+
+        assertEquals("offline consumer got all", sent, listener.count);
+    }
+
+    private static String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))";
+    @Test(timeout = 60 * 1000)
+    public void testMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
+        // create offline subs 1
+        Connection con = createConnection("offCli1");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", filter, true);
+        session.close();
+        con.close();
+
+        // create offline subs 2
+        con = createConnection("offCli2");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", filter, true);
+        session.close();
+        con.close();
+
+        // create online subs
+        Connection con2 = createConnection("onlineCli1");
+        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", filter, true);
+        DurableSubscriptionOfflineTestListener listener2 = new DurableSubscriptionOfflineTestListener();
+        consumer2.setMessageListener(listener2);
+
+        // create non-durable consumer
+        Connection con4 = createConnection("nondurableCli");
+        Session session4 = con4.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer4 = session4.createConsumer(topic, filter, true);
+        DurableSubscriptionOfflineTestListener listener4 = new DurableSubscriptionOfflineTestListener();
+        consumer4.setMessageListener(listener4);
+
+        // send messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        boolean hasRelevant = false;
+        int filtered = 0;
+        for (int i = 0; i < 100; i++) {
+            int postf = (int) (Math.random() * 9) + 1;
+            String d = "D" + postf;
+
+            if ("D1".equals(d) || "D2".equals(d)) {
+                hasRelevant = true;
+                filtered++;
+            }
+
+            Message message = session.createMessage();
+            message.setStringProperty("$a", "A1");
+            message.setStringProperty("$d", d);
+            producer.send(topic, message);
+        }
+
+        Message message = session.createMessage();
+        message.setStringProperty("$a", "A1");
+        message.setBooleanProperty("$b", true);
+        message.setBooleanProperty("$c", hasRelevant);
+        producer.send(topic, message);
+
+        if (hasRelevant)
+            filtered++;
+
+        Thread.sleep(1 * 1000);
+        session.close();
+        con.close();
+
+        Thread.sleep(3 * 1000);
+
+        // test non-durable consumer
+        session4.close();
+        con4.close();
+        assertEquals(filtered, listener4.count); // succeeded!
+
+        // test online subs
+        session2.close();
+        con2.close();
+        assertEquals(filtered, listener2.count); // succeeded!
+
+        // test offline 1
+        con = createConnection("offCli1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
+        DurableSubscriptionOfflineTestListener listener = new FilterCheckListener();
+        consumer.setMessageListener(listener);
+
+        Thread.sleep(3 * 1000);
+        session.close();
+        con.close();
+
+        assertEquals(filtered, listener.count);
+
+        // test offline 2
+        Connection con3 = createConnection("offCli2");
+        Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", filter, true);
+        DurableSubscriptionOfflineTestListener listener3 = new FilterCheckListener();
+        consumer3.setMessageListener(listener3);
+
+        Thread.sleep(3 * 1000);
+        session3.close();
+        con3.close();
+
+        assertEquals(filtered, listener3.count);
+        assertTrue("no unexpected exceptions: " + exceptions, exceptions.isEmpty());
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
+
+        if (PersistenceAdapterChoice.LevelDB == defaultPersistenceAdapter) {
+            // https://issues.apache.org/jira/browse/AMQ-4296
+            return;
+        }
+
+        // create offline subs 1
+        Connection con = createConnection("offCli1");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        session.close();
+        con.close();
+
+        // create offline subs 2
+        con = createConnection("offCli2");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        session.close();
+        con.close();
+
+        // send messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        int filtered = 0;
+        for (int i = 0; i < 10; i++) {
+            boolean filter = (int) (Math.random() * 2) >= 1;
+            if (filter)
+                filtered++;
+
+            Message message = session.createMessage();
+            message.setStringProperty("filter", filter ? "true" : "false");
+            producer.send(topic, message);
+        }
+
+        LOG.info("sent: " + filtered);
+        Thread.sleep(1 * 1000);
+        session.close();
+        con.close();
+
+        // restart broker
+        Thread.sleep(3 * 1000);
+        broker.stop();
+        createBroker(false /*deleteAllMessages*/);
+
+        // send more messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producer = session.createProducer(null);
+
+        for (int i = 0; i < 10; i++) {
+            boolean filter = (int) (Math.random() * 2) >= 1;
+            if (filter)
+                filtered++;
+
+            Message message = session.createMessage();
+            message.setStringProperty("filter", filter ? "true" : "false");
+            producer.send(topic, message);
+        }
+
+        LOG.info("after restart, total sent with filter='true': " + filtered);
+        Thread.sleep(1 * 1000);
+        session.close();
+        con.close();
+
+        // test offline subs
+        con = createConnection("offCli1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener("1>");
+        consumer.setMessageListener(listener);
+
+        Connection con3 = createConnection("offCli2");
+        Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        DurableSubscriptionOfflineTestListener listener3 = new DurableSubscriptionOfflineTestListener();
+        consumer3.setMessageListener(listener3);
+
+        Thread.sleep(3 * 1000);
+
+        session.close();
+        con.close();
+        session3.close();
+        con3.close();
+
+        assertEquals(filtered, listener.count);
+        assertEquals(filtered, listener3.count);
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testOfflineSubscriptionAfterRestart() throws Exception {
+        // create offline subs 1
+        Connection con = createConnection("offCli1");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, false);
+        DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
+        consumer.setMessageListener(listener);
+
+        // send messages
+        MessageProducer producer = session.createProducer(null);
+
+        int sent = 0;
+        for (int i = 0; i < 10; i++) {
+            sent++;
+            Message message = session.createMessage();
+            message.setStringProperty("filter", "false");
+            producer.send(topic, message);
+        }
+
+        LOG.info("sent: " + sent);
+        Thread.sleep(5 * 1000);
+        session.close();
+        con.close();
+
+        assertEquals(sent, listener.count);
+
+        // restart broker
+        Thread.sleep(3 * 1000);
+        broker.stop();
+        createBroker(false /*deleteAllMessages*/);
+
+        // send more messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producer = session.createProducer(null);
+
+        for (int i = 0; i < 10; i++) {
+            sent++;
+            Message message = session.createMessage();
+            message.setStringProperty("filter", "false");
+            producer.send(topic, message);
+        }
+
+        LOG.info("after restart, sent: " + sent);
+        Thread.sleep(1 * 1000);
+        session.close();
+        con.close();
+
+        // test offline subs
+        con = createConnection("offCli1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+        consumer.setMessageListener(listener);
+
+        Thread.sleep(3 * 1000);
+
+        session.close();
+        con.close();
+
+        assertEquals(sent, listener.count);
+    }
+
+    public class FilterCheckListener extends DurableSubscriptionOfflineTestListener  {
+
+        @Override
+        public void onMessage(Message message) {
+            count++;
+
+            try {
+                Object b = message.getObjectProperty("$b");
+                if (b != null) {
+                    boolean c = message.getBooleanProperty("$c");
+                    assertTrue("", c);
+                } else {
+                    String d = message.getStringProperty("$d");
+                    assertTrue("", "D1".equals(d) || "D2".equals(d));
+                }
+            }
+            catch (JMSException e) {
+                e.printStackTrace();
+                exceptions.add(e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/57f5d49a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline4Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline4Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline4Test.java
new file mode 100644
index 0000000..09c50d0
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline4Test.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+
+@RunWith(value = Parameterized.class)
+public class DurableSubscriptionOffline4Test extends DurableSubscriptionOfflineTestBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOffline4Test.class);
+
+    @Parameterized.Parameters(name = "keepDurableSubsActive_{0}")
+    public static Collection<Boolean[]> getTestParameters() {
+        Boolean[] f = {Boolean.FALSE};
+        Boolean[] t = {Boolean.TRUE};
+        List<Boolean[]> booleanChoices = new ArrayList<Boolean[]>();
+        booleanChoices.add(f);
+        booleanChoices.add(t);
+
+        return booleanChoices;
+    }
+
+    public DurableSubscriptionOffline4Test(Boolean keepDurableSubsActive) {
+        this.journalMaxFileLength = 64 * 1024;
+        this.keepDurableSubsActive = keepDurableSubsActive.booleanValue();
+
+        LOG.info(">>>> running {} with keepDurableSubsActive: {}, journalMaxFileLength", testName.getMethodName(), this.keepDurableSubsActive, journalMaxFileLength);
+    }
+
+
+    @Test(timeout = 60 * 1000)
+    // https://issues.apache.org/jira/browse/AMQ-3206
+    public void testCleanupDeletedSubAfterRestart() throws Exception {
+        Connection con = createConnection("cli1");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", null, true);
+        session.close();
+        con.close();
+
+        con = createConnection("cli2");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", null, true);
+        session.close();
+        con.close();
+
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        final int toSend = 500;
+        final String payload = new byte[40*1024].toString();
+        int sent = 0;
+        for (int i = sent; i < toSend; i++) {
+            Message message = session.createTextMessage(payload);
+            message.setStringProperty("filter", "false");
+            message.setIntProperty("ID", i);
+            producer.send(topic, message);
+            sent++;
+        }
+        con.close();
+        LOG.info("sent: " + sent);
+
+        // kill off cli1
+        con = createConnection("cli1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.unsubscribe("SubsId");
+
+        destroyBroker();
+        createBroker(false);
+
+        con = createConnection("cli2");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+        final DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
+        consumer.setMessageListener(listener);
+        assertTrue("got all sent", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("Want: " + toSend  + ", current: " + listener.count);
+                return listener.count == toSend;
+            }
+        }));
+        session.close();
+        con.close();
+
+        destroyBroker();
+        createBroker(false);
+        final KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+        assertTrue("Should have less than three journal files left but was: " +
+                pa.getStore().getJournal().getFileMap().size(), Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return pa.getStore().getJournal().getFileMap().size() <= 3;
+            }
+        }));
+    }
+}
+