You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ke...@apache.org on 2013/12/19 14:37:22 UTC

[2/2] git commit: For AMQ-4874, broke into multiple parts, and converted to use JUnit4 Parameterized instead of CombinationTestSupport

For AMQ-4874, broke into multiple parts, and converted to use JUnit4 Parameterized instead of CombinationTestSupport


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/57f5d49a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/57f5d49a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/57f5d49a

Branch: refs/heads/trunk
Commit: 57f5d49ae9a76f2a3a17fa7c7966130c7a7352fe
Parents: a64976a
Author: Kevin Earls <ke...@kevinearls.com>
Authored: Thu Dec 19 14:37:12 2013 +0100
Committer: Kevin Earls <ke...@kevinearls.com>
Committed: Thu Dec 19 14:37:12 2013 +0100

----------------------------------------------------------------------
 .../DurableSubscriptionOffline1Test.java        | 248 +++++
 .../DurableSubscriptionOffline2Test.java        | 171 ++++
 .../DurableSubscriptionOffline3Test.java        | 424 +++++++++
 .../DurableSubscriptionOffline4Test.java        | 131 +++
 .../DurableSubscriptionOfflineTest.java         | 941 +------------------
 .../DurableSubscriptionOfflineTestBase.java     | 221 +++++
 6 files changed, 1243 insertions(+), 893 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/57f5d49a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline1Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline1Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline1Test.java
new file mode 100644
index 0000000..67745f9
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline1Test.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.TestSupport.PersistenceAdapterChoice;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.actors.threadpool.Arrays;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(value = Parameterized.class)
+public class DurableSubscriptionOffline1Test extends DurableSubscriptionOfflineTestBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOffline1Test.class);
+
+    @Parameterized.Parameters(name = "{0}-{1}")
+    public static Collection<Object[]> getTestParameters() {
+        String osName = System.getProperty("os.name");
+        LOG.debug("Running on [" + osName + "]");
+
+        List<PersistenceAdapterChoice> persistenceAdapterChoices = new ArrayList<PersistenceAdapterChoice>();
+
+        persistenceAdapterChoices.add(PersistenceAdapterChoice.KahaDB);
+        persistenceAdapterChoices.add(PersistenceAdapterChoice.JDBC);
+        if (!osName.equalsIgnoreCase("AIX") && !osName.equalsIgnoreCase("SunOS")) {
+            //choices.add(levelDb);
+            persistenceAdapterChoices.add(PersistenceAdapterChoice.LevelDB);
+        }
+
+        List<Object[]> testParameters = new ArrayList<Object[]>();
+        Boolean[] booleanValues = {Boolean.FALSE, Boolean.TRUE};
+        List<Boolean> booleans = Arrays.asList(booleanValues);
+        for (Boolean booleanValue : booleans) {
+            for (PersistenceAdapterChoice persistenceAdapterChoice : persistenceAdapterChoices) {
+                Object[] currentChoice = {persistenceAdapterChoice, booleanValue};
+                testParameters.add(currentChoice);
+            }
+        }
+
+        return testParameters;
+    }
+
+    public DurableSubscriptionOffline1Test(PersistenceAdapterChoice adapter, Boolean usePrioritySupport) {
+        this.defaultPersistenceAdapter = adapter;
+        this.usePrioritySupport = usePrioritySupport.booleanValue();
+        LOG.debug(">>>> Created with adapter {} usePrioritySupport? {}", defaultPersistenceAdapter, usePrioritySupport);
+
+    }
+
+    @Test
+    public void testConsumeOnlyMatchedMessages() throws Exception {
+        // create durable subscription
+        Connection con = createConnection();
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        session.close();
+        con.close();
+
+        // send messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        int sent = 0;
+        for (int i = 0; i < 10; i++) {
+            boolean filter = i % 2 == 1;
+            if (filter)
+                sent++;
+
+            Message message = session.createMessage();
+            message.setStringProperty("filter", filter ? "true" : "false");
+            producer.send(topic, message);
+        }
+
+        session.close();
+        con.close();
+
+        // consume messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
+        consumer.setMessageListener(listener);
+
+        Thread.sleep(3 * 1000);
+
+        session.close();
+        con.close();
+
+        assertEquals(sent, listener.count);
+    }
+
+    @Test
+    public void testVerifyAllConsumedAreAcked() throws Exception {
+        // create durable subscription
+        Connection con = createConnection();
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        session.close();
+        con.close();
+
+        // send messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        int sent = 0;
+        for (int i = 0; i < 10; i++) {
+            sent++;
+            Message message = session.createMessage();
+            message.setStringProperty("filter", "true");
+            producer.send(topic, message);
+        }
+
+        Thread.sleep(1 * 1000);
+
+        session.close();
+        con.close();
+
+        // consume messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
+        consumer.setMessageListener(listener);
+
+        Thread.sleep(3 * 1000);
+
+        session.close();
+        con.close();
+
+        LOG.info("Consumed: " + listener.count);
+        assertEquals(sent, listener.count);
+
+        // consume messages again, should not get any
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        listener = new DurableSubscriptionOfflineTestListener();
+        consumer.setMessageListener(listener);
+
+        Thread.sleep(3 * 1000);
+
+        session.close();
+        con.close();
+
+        assertEquals(0, listener.count);
+    }
+
+    @Test
+    public void testOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
+        Connection con = createConnection("offCli1");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        session.close();
+        con.close();
+
+        con = createConnection("offCli2");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        session.close();
+        con.close();
+
+        Connection con2 = createConnection("onlineCli1");
+        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        DurableSubscriptionOfflineTestListener listener2 = new DurableSubscriptionOfflineTestListener();
+        consumer2.setMessageListener(listener2);
+
+        // send messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        int sent = 0;
+        for (int i = 0; i < 10; i++) {
+            sent++;
+            Message message = session.createMessage();
+            message.setStringProperty("filter", "true");
+            producer.send(topic, message);
+        }
+
+        Thread.sleep(1 * 1000);
+        session.close();
+        con.close();
+
+        // test online subs
+        Thread.sleep(3 * 1000);
+        session2.close();
+        con2.close();
+        assertEquals(sent, listener2.count);
+
+        // restart broker
+        broker.stop();
+        createBroker(false /*deleteAllMessages*/);
+
+        // test offline
+        con = createConnection("offCli1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+
+        Connection con3 = createConnection("offCli2");
+        Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+
+        DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
+        consumer.setMessageListener(listener);
+        DurableSubscriptionOfflineTestListener listener3 = new DurableSubscriptionOfflineTestListener();
+        consumer3.setMessageListener(listener3);
+
+        Thread.sleep(3 * 1000);
+
+        session.close();
+        con.close();
+        session3.close();
+        con3.close();
+
+        assertEquals(sent, listener.count);
+        assertEquals(sent, listener3.count);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/57f5d49a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java
new file mode 100644
index 0000000..960d9ea
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
+import org.apache.activemq.broker.jmx.TopicViewMBean;
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.ObjectName;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+
+@RunWith(value = Parameterized.class)
+public class DurableSubscriptionOffline2Test extends DurableSubscriptionOfflineTestBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOffline2Test.class);
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection<Boolean[]> getTestParameters() {
+        Boolean[] f = {Boolean.FALSE};
+        Boolean[] t = {Boolean.TRUE};
+        List<Boolean[]> booleanChoices = new ArrayList<Boolean[]>();
+        booleanChoices.add(f);
+        booleanChoices.add(t);
+
+        return booleanChoices;
+    }
+
+    public DurableSubscriptionOffline2Test(Boolean keepDurableSubsActive) {
+        this.keepDurableSubsActive = keepDurableSubsActive.booleanValue();
+
+        LOG.info(">>>> running {} with keepDurableSubsActive: {}", testName.getMethodName(), this.keepDurableSubsActive);
+    }
+
+
+    @Test(timeout = 60 * 1000)
+    public void testJMXCountersWithOfflineSubs() throws Exception {
+        // create durable subscription 1
+        Connection con = createConnection("cliId1");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", null, true);
+        session.close();
+        con.close();
+
+        // restart broker
+        broker.stop();
+        createBroker(false /*deleteAllMessages*/);
+
+        // send messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        int sent = 0;
+        for (int i = 0; i < 10; i++) {
+            sent++;
+            Message message = session.createMessage();
+            producer.send(topic, message);
+        }
+        session.close();
+        con.close();
+
+        // consume some messages
+        con = createConnection("cliId1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+
+        for (int i=0; i<sent/2; i++) {
+            Message m =  consumer.receive(4000);
+            assertNotNull("got message: " + i, m);
+            LOG.info("Got :" + i + ", " + m);
+        }
+
+        // check some counters while active
+        ObjectName activeDurableSubName = broker.getAdminView().getDurableTopicSubscribers()[0];
+        LOG.info("active durable sub name: " + activeDurableSubName);
+        final DurableSubscriptionViewMBean durableSubscriptionView = (DurableSubscriptionViewMBean)
+                broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true);
+
+        assertTrue("is active", durableSubscriptionView.isActive());
+        assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView.getEnqueueCounter());
+        assertTrue("correct waiting acks", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 5 == durableSubscriptionView.getMessageCountAwaitingAcknowledge();
+            }
+        }));
+        assertEquals("correct dequeue", 5, durableSubscriptionView.getDequeueCounter());
+
+
+        ObjectName destinationName = broker.getAdminView().getTopics()[0];
+        TopicViewMBean topicView = (TopicViewMBean) broker.getManagementContext().newProxyInstance(destinationName, TopicViewMBean.class, true);
+        assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
+        assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
+        assertEquals("inflight", 5, topicView.getInFlightCount());
+
+        session.close();
+        con.close();
+
+        // check some counters when inactive
+        ObjectName inActiveDurableSubName = broker.getAdminView().getInactiveDurableTopicSubscribers()[0];
+        LOG.info("inactive durable sub name: " + inActiveDurableSubName);
+        DurableSubscriptionViewMBean durableSubscriptionView1 = (DurableSubscriptionViewMBean)
+                broker.getManagementContext().newProxyInstance(inActiveDurableSubName, DurableSubscriptionViewMBean.class, true);
+
+        assertTrue("is not active", !durableSubscriptionView1.isActive());
+        assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView1.getEnqueueCounter());
+        assertEquals("correct awaiting ack", 0, durableSubscriptionView1.getMessageCountAwaitingAcknowledge());
+        assertEquals("correct dequeue", keepDurableSubsActive ? 5 : 0, durableSubscriptionView1.getDequeueCounter());
+
+        // destination view
+        assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
+        assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
+        assertEquals("inflight back to 0 after deactivate", 0, topicView.getInFlightCount());
+
+        // consume the rest
+        con = createConnection("cliId1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+
+        for (int i=0; i<sent/2;i++) {
+            Message m =  consumer.receive(30000);
+            assertNotNull("got message: " + i, m);
+            LOG.info("Got :" + i + ", " + m);
+        }
+
+        activeDurableSubName = broker.getAdminView().getDurableTopicSubscribers()[0];
+        LOG.info("durable sub name: " + activeDurableSubName);
+        final DurableSubscriptionViewMBean durableSubscriptionView2 = (DurableSubscriptionViewMBean)
+                broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true);
+
+        assertTrue("is active", durableSubscriptionView2.isActive());
+        assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView2.getEnqueueCounter());
+        assertTrue("correct dequeue", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                long val = durableSubscriptionView2.getDequeueCounter();
+                LOG.info("dequeue count:" + val);
+                return 10 == val;
+            }
+        }));
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/57f5d49a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline3Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline3Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline3Test.java
new file mode 100644
index 0000000..c0aee13
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline3Test.java
@@ -0,0 +1,424 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.TestSupport.PersistenceAdapterChoice;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+@RunWith(value = Parameterized.class)
+public class DurableSubscriptionOffline3Test extends DurableSubscriptionOfflineTestBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOffline3Test.class);
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection<PersistenceAdapterChoice[]> getTestParameters() {
+        String osName = System.getProperty("os.name");
+        LOG.debug("Running on [" + osName + "]");
+
+        PersistenceAdapterChoice[] kahaDb = {PersistenceAdapterChoice.KahaDB};
+        PersistenceAdapterChoice[] jdbc = {PersistenceAdapterChoice.JDBC};
+        List<PersistenceAdapterChoice[]> choices = new ArrayList<PersistenceAdapterChoice[]>();
+        choices.add(kahaDb);
+        choices.add(jdbc);
+        if (!osName.equalsIgnoreCase("AIX") && !osName.equalsIgnoreCase("SunOS")) {
+            PersistenceAdapterChoice[] levelDb = {PersistenceAdapterChoice.LevelDB};
+            choices.add(levelDb);
+        }
+
+        return choices;
+    }
+
+    public DurableSubscriptionOffline3Test(PersistenceAdapterChoice persistenceAdapterChoice) {
+        this.defaultPersistenceAdapter = persistenceAdapterChoice;
+
+        LOG.info(">>>> running {} with persistenceAdapterChoice: {}", testName.getMethodName(), this.defaultPersistenceAdapter);
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testInterleavedOfflineSubscriptionCanConsume() throws Exception {
+        // create durable subscription 1
+        Connection con = createConnection("cliId1");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        session.close();
+        con.close();
+
+        // send messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        int sent = 0;
+        for (int i = 0; i < 10; i++) {
+            sent++;
+            Message message = session.createMessage();
+            message.setStringProperty("filter", "true");
+            producer.send(topic, message);
+        }
+
+        Thread.sleep(1 * 1000);
+
+        // create durable subscription 2
+        Connection con2 = createConnection("cliId2");
+        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        DurableSubscriptionOfflineTestListener listener2 = new DurableSubscriptionOfflineTestListener();
+        consumer2.setMessageListener(listener2);
+
+        assertEquals(0, listener2.count);
+        session2.close();
+        con2.close();
+
+        // send some more
+        for (int i = 0; i < 10; i++) {
+            sent++;
+            Message message = session.createMessage();
+            message.setStringProperty("filter", "true");
+            producer.send(topic, message);
+        }
+
+        Thread.sleep(1 * 1000);
+        session.close();
+        con.close();
+
+        con2 = createConnection("cliId2");
+        session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        listener2 = new DurableSubscriptionOfflineTestListener("cliId2");
+        consumer2.setMessageListener(listener2);
+        // test online subs
+        Thread.sleep(3 * 1000);
+
+        assertEquals(10, listener2.count);
+
+        // consume all messages
+        con = createConnection("cliId1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener("cliId1");
+        consumer.setMessageListener(listener);
+
+        Thread.sleep(3 * 1000);
+
+        session.close();
+        con.close();
+
+        assertEquals("offline consumer got all", sent, listener.count);
+    }
+
+    private static String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))";
+    @Test(timeout = 60 * 1000)
+    public void testMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
+        // create offline subs 1
+        Connection con = createConnection("offCli1");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", filter, true);
+        session.close();
+        con.close();
+
+        // create offline subs 2
+        con = createConnection("offCli2");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", filter, true);
+        session.close();
+        con.close();
+
+        // create online subs
+        Connection con2 = createConnection("onlineCli1");
+        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", filter, true);
+        DurableSubscriptionOfflineTestListener listener2 = new DurableSubscriptionOfflineTestListener();
+        consumer2.setMessageListener(listener2);
+
+        // create non-durable consumer
+        Connection con4 = createConnection("nondurableCli");
+        Session session4 = con4.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer4 = session4.createConsumer(topic, filter, true);
+        DurableSubscriptionOfflineTestListener listener4 = new DurableSubscriptionOfflineTestListener();
+        consumer4.setMessageListener(listener4);
+
+        // send messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        boolean hasRelevant = false;
+        int filtered = 0;
+        for (int i = 0; i < 100; i++) {
+            int postf = (int) (Math.random() * 9) + 1;
+            String d = "D" + postf;
+
+            if ("D1".equals(d) || "D2".equals(d)) {
+                hasRelevant = true;
+                filtered++;
+            }
+
+            Message message = session.createMessage();
+            message.setStringProperty("$a", "A1");
+            message.setStringProperty("$d", d);
+            producer.send(topic, message);
+        }
+
+        Message message = session.createMessage();
+        message.setStringProperty("$a", "A1");
+        message.setBooleanProperty("$b", true);
+        message.setBooleanProperty("$c", hasRelevant);
+        producer.send(topic, message);
+
+        if (hasRelevant)
+            filtered++;
+
+        Thread.sleep(1 * 1000);
+        session.close();
+        con.close();
+
+        Thread.sleep(3 * 1000);
+
+        // test non-durable consumer
+        session4.close();
+        con4.close();
+        assertEquals(filtered, listener4.count); // succeeded!
+
+        // test online subs
+        session2.close();
+        con2.close();
+        assertEquals(filtered, listener2.count); // succeeded!
+
+        // test offline 1
+        con = createConnection("offCli1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
+        DurableSubscriptionOfflineTestListener listener = new FilterCheckListener();
+        consumer.setMessageListener(listener);
+
+        Thread.sleep(3 * 1000);
+        session.close();
+        con.close();
+
+        assertEquals(filtered, listener.count);
+
+        // test offline 2
+        Connection con3 = createConnection("offCli2");
+        Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", filter, true);
+        DurableSubscriptionOfflineTestListener listener3 = new FilterCheckListener();
+        consumer3.setMessageListener(listener3);
+
+        Thread.sleep(3 * 1000);
+        session3.close();
+        con3.close();
+
+        assertEquals(filtered, listener3.count);
+        assertTrue("no unexpected exceptions: " + exceptions, exceptions.isEmpty());
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
+
+        if (PersistenceAdapterChoice.LevelDB == defaultPersistenceAdapter) {
+            // https://issues.apache.org/jira/browse/AMQ-4296
+            return;
+        }
+
+        // create offline subs 1
+        Connection con = createConnection("offCli1");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        session.close();
+        con.close();
+
+        // create offline subs 2
+        con = createConnection("offCli2");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        session.close();
+        con.close();
+
+        // send messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        int filtered = 0;
+        for (int i = 0; i < 10; i++) {
+            boolean filter = (int) (Math.random() * 2) >= 1;
+            if (filter)
+                filtered++;
+
+            Message message = session.createMessage();
+            message.setStringProperty("filter", filter ? "true" : "false");
+            producer.send(topic, message);
+        }
+
+        LOG.info("sent: " + filtered);
+        Thread.sleep(1 * 1000);
+        session.close();
+        con.close();
+
+        // restart broker
+        Thread.sleep(3 * 1000);
+        broker.stop();
+        createBroker(false /*deleteAllMessages*/);
+
+        // send more messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producer = session.createProducer(null);
+
+        for (int i = 0; i < 10; i++) {
+            boolean filter = (int) (Math.random() * 2) >= 1;
+            if (filter)
+                filtered++;
+
+            Message message = session.createMessage();
+            message.setStringProperty("filter", filter ? "true" : "false");
+            producer.send(topic, message);
+        }
+
+        LOG.info("after restart, total sent with filter='true': " + filtered);
+        Thread.sleep(1 * 1000);
+        session.close();
+        con.close();
+
+        // test offline subs
+        con = createConnection("offCli1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener("1>");
+        consumer.setMessageListener(listener);
+
+        Connection con3 = createConnection("offCli2");
+        Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        DurableSubscriptionOfflineTestListener listener3 = new DurableSubscriptionOfflineTestListener();
+        consumer3.setMessageListener(listener3);
+
+        Thread.sleep(3 * 1000);
+
+        session.close();
+        con.close();
+        session3.close();
+        con3.close();
+
+        assertEquals(filtered, listener.count);
+        assertEquals(filtered, listener3.count);
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testOfflineSubscriptionAfterRestart() throws Exception {
+        // create offline subs 1
+        Connection con = createConnection("offCli1");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, false);
+        DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
+        consumer.setMessageListener(listener);
+
+        // send messages
+        MessageProducer producer = session.createProducer(null);
+
+        int sent = 0;
+        for (int i = 0; i < 10; i++) {
+            sent++;
+            Message message = session.createMessage();
+            message.setStringProperty("filter", "false");
+            producer.send(topic, message);
+        }
+
+        LOG.info("sent: " + sent);
+        Thread.sleep(5 * 1000);
+        session.close();
+        con.close();
+
+        assertEquals(sent, listener.count);
+
+        // restart broker
+        Thread.sleep(3 * 1000);
+        broker.stop();
+        createBroker(false /*deleteAllMessages*/);
+
+        // send more messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producer = session.createProducer(null);
+
+        for (int i = 0; i < 10; i++) {
+            sent++;
+            Message message = session.createMessage();
+            message.setStringProperty("filter", "false");
+            producer.send(topic, message);
+        }
+
+        LOG.info("after restart, sent: " + sent);
+        Thread.sleep(1 * 1000);
+        session.close();
+        con.close();
+
+        // test offline subs
+        con = createConnection("offCli1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+        consumer.setMessageListener(listener);
+
+        Thread.sleep(3 * 1000);
+
+        session.close();
+        con.close();
+
+        assertEquals(sent, listener.count);
+    }
+
+    public class FilterCheckListener extends DurableSubscriptionOfflineTestListener  {
+
+        @Override
+        public void onMessage(Message message) {
+            count++;
+
+            try {
+                Object b = message.getObjectProperty("$b");
+                if (b != null) {
+                    boolean c = message.getBooleanProperty("$c");
+                    assertTrue("", c);
+                } else {
+                    String d = message.getStringProperty("$d");
+                    assertTrue("", "D1".equals(d) || "D2".equals(d));
+                }
+            }
+            catch (JMSException e) {
+                e.printStackTrace();
+                exceptions.add(e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/57f5d49a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline4Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline4Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline4Test.java
new file mode 100644
index 0000000..09c50d0
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline4Test.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+
+@RunWith(value = Parameterized.class)
+public class DurableSubscriptionOffline4Test extends DurableSubscriptionOfflineTestBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOffline4Test.class);
+
+    @Parameterized.Parameters(name = "keepDurableSubsActive_{0}")
+    public static Collection<Boolean[]> getTestParameters() {
+        Boolean[] f = {Boolean.FALSE};
+        Boolean[] t = {Boolean.TRUE};
+        List<Boolean[]> booleanChoices = new ArrayList<Boolean[]>();
+        booleanChoices.add(f);
+        booleanChoices.add(t);
+
+        return booleanChoices;
+    }
+
+    public DurableSubscriptionOffline4Test(Boolean keepDurableSubsActive) {
+        this.journalMaxFileLength = 64 * 1024;
+        this.keepDurableSubsActive = keepDurableSubsActive.booleanValue();
+
+        LOG.info(">>>> running {} with keepDurableSubsActive: {}, journalMaxFileLength", testName.getMethodName(), this.keepDurableSubsActive, journalMaxFileLength);
+    }
+
+
+    @Test(timeout = 60 * 1000)
+    // https://issues.apache.org/jira/browse/AMQ-3206
+    public void testCleanupDeletedSubAfterRestart() throws Exception {
+        Connection con = createConnection("cli1");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", null, true);
+        session.close();
+        con.close();
+
+        con = createConnection("cli2");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", null, true);
+        session.close();
+        con.close();
+
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        final int toSend = 500;
+        final String payload = new byte[40*1024].toString();
+        int sent = 0;
+        for (int i = sent; i < toSend; i++) {
+            Message message = session.createTextMessage(payload);
+            message.setStringProperty("filter", "false");
+            message.setIntProperty("ID", i);
+            producer.send(topic, message);
+            sent++;
+        }
+        con.close();
+        LOG.info("sent: " + sent);
+
+        // kill off cli1
+        con = createConnection("cli1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.unsubscribe("SubsId");
+
+        destroyBroker();
+        createBroker(false);
+
+        con = createConnection("cli2");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+        final DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
+        consumer.setMessageListener(listener);
+        assertTrue("got all sent", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("Want: " + toSend  + ", current: " + listener.count);
+                return listener.count == toSend;
+            }
+        }));
+        session.close();
+        con.close();
+
+        destroyBroker();
+        createBroker(false);
+        final KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+        assertTrue("Should have less than three journal files left but was: " +
+                pa.getStore().getJournal().getFileMap().size(), Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return pa.getStore().getJournal().getFileMap().size() <= 3;
+            }
+        }));
+    }
+}
+