You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ke...@apache.org on 2013/12/19 14:37:22 UTC
[2/2] git commit: For AMQ-4874, broke into multiple parts,
and converted to use JUnit4 Parameterized instead of
CombinationTestSupport
For AMQ-4874, broke into multiple parts, and converted to use JUnit4 Parameterized instead of CombinationTestSupport
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/57f5d49a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/57f5d49a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/57f5d49a
Branch: refs/heads/trunk
Commit: 57f5d49ae9a76f2a3a17fa7c7966130c7a7352fe
Parents: a64976a
Author: Kevin Earls <ke...@kevinearls.com>
Authored: Thu Dec 19 14:37:12 2013 +0100
Committer: Kevin Earls <ke...@kevinearls.com>
Committed: Thu Dec 19 14:37:12 2013 +0100
----------------------------------------------------------------------
.../DurableSubscriptionOffline1Test.java | 248 +++++
.../DurableSubscriptionOffline2Test.java | 171 ++++
.../DurableSubscriptionOffline3Test.java | 424 +++++++++
.../DurableSubscriptionOffline4Test.java | 131 +++
.../DurableSubscriptionOfflineTest.java | 941 +------------------
.../DurableSubscriptionOfflineTestBase.java | 221 +++++
6 files changed, 1243 insertions(+), 893 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/57f5d49a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline1Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline1Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline1Test.java
new file mode 100644
index 0000000..67745f9
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline1Test.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.TestSupport.PersistenceAdapterChoice;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.actors.threadpool.Arrays;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(value = Parameterized.class)
+public class DurableSubscriptionOffline1Test extends DurableSubscriptionOfflineTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOffline1Test.class);
+
+ @Parameterized.Parameters(name = "{0}-{1}")
+ public static Collection<Object[]> getTestParameters() {
+ String osName = System.getProperty("os.name");
+ LOG.debug("Running on [" + osName + "]");
+
+ List<PersistenceAdapterChoice> persistenceAdapterChoices = new ArrayList<PersistenceAdapterChoice>();
+
+ persistenceAdapterChoices.add(PersistenceAdapterChoice.KahaDB);
+ persistenceAdapterChoices.add(PersistenceAdapterChoice.JDBC);
+ if (!osName.equalsIgnoreCase("AIX") && !osName.equalsIgnoreCase("SunOS")) {
+ //choices.add(levelDb);
+ persistenceAdapterChoices.add(PersistenceAdapterChoice.LevelDB);
+ }
+
+ List<Object[]> testParameters = new ArrayList<Object[]>();
+ Boolean[] booleanValues = {Boolean.FALSE, Boolean.TRUE};
+ List<Boolean> booleans = Arrays.asList(booleanValues);
+ for (Boolean booleanValue : booleans) {
+ for (PersistenceAdapterChoice persistenceAdapterChoice : persistenceAdapterChoices) {
+ Object[] currentChoice = {persistenceAdapterChoice, booleanValue};
+ testParameters.add(currentChoice);
+ }
+ }
+
+ return testParameters;
+ }
+
+ public DurableSubscriptionOffline1Test(PersistenceAdapterChoice adapter, Boolean usePrioritySupport) {
+ this.defaultPersistenceAdapter = adapter;
+ this.usePrioritySupport = usePrioritySupport.booleanValue();
+ LOG.debug(">>>> Created with adapter {} usePrioritySupport? {}", defaultPersistenceAdapter, usePrioritySupport);
+
+ }
+
+ @Test
+ public void testConsumeOnlyMatchedMessages() throws Exception {
+ // create durable subscription
+ Connection con = createConnection();
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ session.close();
+ con.close();
+
+ // send messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(null);
+
+ int sent = 0;
+ for (int i = 0; i < 10; i++) {
+ boolean filter = i % 2 == 1;
+ if (filter)
+ sent++;
+
+ Message message = session.createMessage();
+ message.setStringProperty("filter", filter ? "true" : "false");
+ producer.send(topic, message);
+ }
+
+ session.close();
+ con.close();
+
+ // consume messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
+ consumer.setMessageListener(listener);
+
+ Thread.sleep(3 * 1000);
+
+ session.close();
+ con.close();
+
+ assertEquals(sent, listener.count);
+ }
+
+ @Test
+ public void testVerifyAllConsumedAreAcked() throws Exception {
+ // create durable subscription
+ Connection con = createConnection();
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ session.close();
+ con.close();
+
+ // send messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(null);
+
+ int sent = 0;
+ for (int i = 0; i < 10; i++) {
+ sent++;
+ Message message = session.createMessage();
+ message.setStringProperty("filter", "true");
+ producer.send(topic, message);
+ }
+
+ Thread.sleep(1 * 1000);
+
+ session.close();
+ con.close();
+
+ // consume messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
+ consumer.setMessageListener(listener);
+
+ Thread.sleep(3 * 1000);
+
+ session.close();
+ con.close();
+
+ LOG.info("Consumed: " + listener.count);
+ assertEquals(sent, listener.count);
+
+ // consume messages again, should not get any
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ listener = new DurableSubscriptionOfflineTestListener();
+ consumer.setMessageListener(listener);
+
+ Thread.sleep(3 * 1000);
+
+ session.close();
+ con.close();
+
+ assertEquals(0, listener.count);
+ }
+
+ @Test
+ public void testOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
+ Connection con = createConnection("offCli1");
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ session.close();
+ con.close();
+
+ con = createConnection("offCli2");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ session.close();
+ con.close();
+
+ Connection con2 = createConnection("onlineCli1");
+ Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ DurableSubscriptionOfflineTestListener listener2 = new DurableSubscriptionOfflineTestListener();
+ consumer2.setMessageListener(listener2);
+
+ // send messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(null);
+
+ int sent = 0;
+ for (int i = 0; i < 10; i++) {
+ sent++;
+ Message message = session.createMessage();
+ message.setStringProperty("filter", "true");
+ producer.send(topic, message);
+ }
+
+ Thread.sleep(1 * 1000);
+ session.close();
+ con.close();
+
+ // test online subs
+ Thread.sleep(3 * 1000);
+ session2.close();
+ con2.close();
+ assertEquals(sent, listener2.count);
+
+ // restart broker
+ broker.stop();
+ createBroker(false /*deleteAllMessages*/);
+
+ // test offline
+ con = createConnection("offCli1");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+
+ Connection con3 = createConnection("offCli2");
+ Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+
+ DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
+ consumer.setMessageListener(listener);
+ DurableSubscriptionOfflineTestListener listener3 = new DurableSubscriptionOfflineTestListener();
+ consumer3.setMessageListener(listener3);
+
+ Thread.sleep(3 * 1000);
+
+ session.close();
+ con.close();
+ session3.close();
+ con3.close();
+
+ assertEquals(sent, listener.count);
+ assertEquals(sent, listener3.count);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/57f5d49a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java
new file mode 100644
index 0000000..960d9ea
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
+import org.apache.activemq.broker.jmx.TopicViewMBean;
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.ObjectName;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+
+@RunWith(value = Parameterized.class)
+public class DurableSubscriptionOffline2Test extends DurableSubscriptionOfflineTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOffline2Test.class);
+
+ @Parameterized.Parameters(name = "{0}")
+ public static Collection<Boolean[]> getTestParameters() {
+ Boolean[] f = {Boolean.FALSE};
+ Boolean[] t = {Boolean.TRUE};
+ List<Boolean[]> booleanChoices = new ArrayList<Boolean[]>();
+ booleanChoices.add(f);
+ booleanChoices.add(t);
+
+ return booleanChoices;
+ }
+
+ public DurableSubscriptionOffline2Test(Boolean keepDurableSubsActive) {
+ this.keepDurableSubsActive = keepDurableSubsActive.booleanValue();
+
+ LOG.info(">>>> running {} with keepDurableSubsActive: {}", testName.getMethodName(), this.keepDurableSubsActive);
+ }
+
+
+ @Test(timeout = 60 * 1000)
+ public void testJMXCountersWithOfflineSubs() throws Exception {
+ // create durable subscription 1
+ Connection con = createConnection("cliId1");
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", null, true);
+ session.close();
+ con.close();
+
+ // restart broker
+ broker.stop();
+ createBroker(false /*deleteAllMessages*/);
+
+ // send messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(null);
+
+ int sent = 0;
+ for (int i = 0; i < 10; i++) {
+ sent++;
+ Message message = session.createMessage();
+ producer.send(topic, message);
+ }
+ session.close();
+ con.close();
+
+ // consume some messages
+ con = createConnection("cliId1");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+
+ for (int i=0; i<sent/2; i++) {
+ Message m = consumer.receive(4000);
+ assertNotNull("got message: " + i, m);
+ LOG.info("Got :" + i + ", " + m);
+ }
+
+ // check some counters while active
+ ObjectName activeDurableSubName = broker.getAdminView().getDurableTopicSubscribers()[0];
+ LOG.info("active durable sub name: " + activeDurableSubName);
+ final DurableSubscriptionViewMBean durableSubscriptionView = (DurableSubscriptionViewMBean)
+ broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true);
+
+ assertTrue("is active", durableSubscriptionView.isActive());
+ assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView.getEnqueueCounter());
+ assertTrue("correct waiting acks", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return 5 == durableSubscriptionView.getMessageCountAwaitingAcknowledge();
+ }
+ }));
+ assertEquals("correct dequeue", 5, durableSubscriptionView.getDequeueCounter());
+
+
+ ObjectName destinationName = broker.getAdminView().getTopics()[0];
+ TopicViewMBean topicView = (TopicViewMBean) broker.getManagementContext().newProxyInstance(destinationName, TopicViewMBean.class, true);
+ assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
+ assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
+ assertEquals("inflight", 5, topicView.getInFlightCount());
+
+ session.close();
+ con.close();
+
+ // check some counters when inactive
+ ObjectName inActiveDurableSubName = broker.getAdminView().getInactiveDurableTopicSubscribers()[0];
+ LOG.info("inactive durable sub name: " + inActiveDurableSubName);
+ DurableSubscriptionViewMBean durableSubscriptionView1 = (DurableSubscriptionViewMBean)
+ broker.getManagementContext().newProxyInstance(inActiveDurableSubName, DurableSubscriptionViewMBean.class, true);
+
+ assertTrue("is not active", !durableSubscriptionView1.isActive());
+ assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView1.getEnqueueCounter());
+ assertEquals("correct awaiting ack", 0, durableSubscriptionView1.getMessageCountAwaitingAcknowledge());
+ assertEquals("correct dequeue", keepDurableSubsActive ? 5 : 0, durableSubscriptionView1.getDequeueCounter());
+
+ // destination view
+ assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
+ assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
+ assertEquals("inflight back to 0 after deactivate", 0, topicView.getInFlightCount());
+
+ // consume the rest
+ con = createConnection("cliId1");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+
+ for (int i=0; i<sent/2;i++) {
+ Message m = consumer.receive(30000);
+ assertNotNull("got message: " + i, m);
+ LOG.info("Got :" + i + ", " + m);
+ }
+
+ activeDurableSubName = broker.getAdminView().getDurableTopicSubscribers()[0];
+ LOG.info("durable sub name: " + activeDurableSubName);
+ final DurableSubscriptionViewMBean durableSubscriptionView2 = (DurableSubscriptionViewMBean)
+ broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true);
+
+ assertTrue("is active", durableSubscriptionView2.isActive());
+ assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView2.getEnqueueCounter());
+ assertTrue("correct dequeue", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ long val = durableSubscriptionView2.getDequeueCounter();
+ LOG.info("dequeue count:" + val);
+ return 10 == val;
+ }
+ }));
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/57f5d49a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline3Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline3Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline3Test.java
new file mode 100644
index 0000000..c0aee13
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline3Test.java
@@ -0,0 +1,424 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.TestSupport.PersistenceAdapterChoice;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+@RunWith(value = Parameterized.class)
+public class DurableSubscriptionOffline3Test extends DurableSubscriptionOfflineTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOffline3Test.class);
+
+ @Parameterized.Parameters(name = "{0}")
+ public static Collection<PersistenceAdapterChoice[]> getTestParameters() {
+ String osName = System.getProperty("os.name");
+ LOG.debug("Running on [" + osName + "]");
+
+ PersistenceAdapterChoice[] kahaDb = {PersistenceAdapterChoice.KahaDB};
+ PersistenceAdapterChoice[] jdbc = {PersistenceAdapterChoice.JDBC};
+ List<PersistenceAdapterChoice[]> choices = new ArrayList<PersistenceAdapterChoice[]>();
+ choices.add(kahaDb);
+ choices.add(jdbc);
+ if (!osName.equalsIgnoreCase("AIX") && !osName.equalsIgnoreCase("SunOS")) {
+ PersistenceAdapterChoice[] levelDb = {PersistenceAdapterChoice.LevelDB};
+ choices.add(levelDb);
+ }
+
+ return choices;
+ }
+
+ public DurableSubscriptionOffline3Test(PersistenceAdapterChoice persistenceAdapterChoice) {
+ this.defaultPersistenceAdapter = persistenceAdapterChoice;
+
+ LOG.info(">>>> running {} with persistenceAdapterChoice: {}", testName.getMethodName(), this.defaultPersistenceAdapter);
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testInterleavedOfflineSubscriptionCanConsume() throws Exception {
+ // create durable subscription 1
+ Connection con = createConnection("cliId1");
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ session.close();
+ con.close();
+
+ // send messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(null);
+
+ int sent = 0;
+ for (int i = 0; i < 10; i++) {
+ sent++;
+ Message message = session.createMessage();
+ message.setStringProperty("filter", "true");
+ producer.send(topic, message);
+ }
+
+ Thread.sleep(1 * 1000);
+
+ // create durable subscription 2
+ Connection con2 = createConnection("cliId2");
+ Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ DurableSubscriptionOfflineTestListener listener2 = new DurableSubscriptionOfflineTestListener();
+ consumer2.setMessageListener(listener2);
+
+ assertEquals(0, listener2.count);
+ session2.close();
+ con2.close();
+
+ // send some more
+ for (int i = 0; i < 10; i++) {
+ sent++;
+ Message message = session.createMessage();
+ message.setStringProperty("filter", "true");
+ producer.send(topic, message);
+ }
+
+ Thread.sleep(1 * 1000);
+ session.close();
+ con.close();
+
+ con2 = createConnection("cliId2");
+ session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ listener2 = new DurableSubscriptionOfflineTestListener("cliId2");
+ consumer2.setMessageListener(listener2);
+ // test online subs
+ Thread.sleep(3 * 1000);
+
+ assertEquals(10, listener2.count);
+
+ // consume all messages
+ con = createConnection("cliId1");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener("cliId1");
+ consumer.setMessageListener(listener);
+
+ Thread.sleep(3 * 1000);
+
+ session.close();
+ con.close();
+
+ assertEquals("offline consumer got all", sent, listener.count);
+ }
+
+ private static String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))";
+ @Test(timeout = 60 * 1000)
+ public void testMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
+ // create offline subs 1
+ Connection con = createConnection("offCli1");
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", filter, true);
+ session.close();
+ con.close();
+
+ // create offline subs 2
+ con = createConnection("offCli2");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", filter, true);
+ session.close();
+ con.close();
+
+ // create online subs
+ Connection con2 = createConnection("onlineCli1");
+ Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", filter, true);
+ DurableSubscriptionOfflineTestListener listener2 = new DurableSubscriptionOfflineTestListener();
+ consumer2.setMessageListener(listener2);
+
+ // create non-durable consumer
+ Connection con4 = createConnection("nondurableCli");
+ Session session4 = con4.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer4 = session4.createConsumer(topic, filter, true);
+ DurableSubscriptionOfflineTestListener listener4 = new DurableSubscriptionOfflineTestListener();
+ consumer4.setMessageListener(listener4);
+
+ // send messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(null);
+
+ boolean hasRelevant = false;
+ int filtered = 0;
+ for (int i = 0; i < 100; i++) {
+ int postf = (int) (Math.random() * 9) + 1;
+ String d = "D" + postf;
+
+ if ("D1".equals(d) || "D2".equals(d)) {
+ hasRelevant = true;
+ filtered++;
+ }
+
+ Message message = session.createMessage();
+ message.setStringProperty("$a", "A1");
+ message.setStringProperty("$d", d);
+ producer.send(topic, message);
+ }
+
+ Message message = session.createMessage();
+ message.setStringProperty("$a", "A1");
+ message.setBooleanProperty("$b", true);
+ message.setBooleanProperty("$c", hasRelevant);
+ producer.send(topic, message);
+
+ if (hasRelevant)
+ filtered++;
+
+ Thread.sleep(1 * 1000);
+ session.close();
+ con.close();
+
+ Thread.sleep(3 * 1000);
+
+ // test non-durable consumer
+ session4.close();
+ con4.close();
+ assertEquals(filtered, listener4.count); // succeeded!
+
+ // test online subs
+ session2.close();
+ con2.close();
+ assertEquals(filtered, listener2.count); // succeeded!
+
+ // test offline 1
+ con = createConnection("offCli1");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
+ DurableSubscriptionOfflineTestListener listener = new FilterCheckListener();
+ consumer.setMessageListener(listener);
+
+ Thread.sleep(3 * 1000);
+ session.close();
+ con.close();
+
+ assertEquals(filtered, listener.count);
+
+ // test offline 2
+ Connection con3 = createConnection("offCli2");
+ Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", filter, true);
+ DurableSubscriptionOfflineTestListener listener3 = new FilterCheckListener();
+ consumer3.setMessageListener(listener3);
+
+ Thread.sleep(3 * 1000);
+ session3.close();
+ con3.close();
+
+ assertEquals(filtered, listener3.count);
+ assertTrue("no unexpected exceptions: " + exceptions, exceptions.isEmpty());
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
+
+ if (PersistenceAdapterChoice.LevelDB == defaultPersistenceAdapter) {
+ // https://issues.apache.org/jira/browse/AMQ-4296
+ return;
+ }
+
+ // create offline subs 1
+ Connection con = createConnection("offCli1");
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ session.close();
+ con.close();
+
+ // create offline subs 2
+ con = createConnection("offCli2");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ session.close();
+ con.close();
+
+ // send messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(null);
+
+ int filtered = 0;
+ for (int i = 0; i < 10; i++) {
+ boolean filter = (int) (Math.random() * 2) >= 1;
+ if (filter)
+ filtered++;
+
+ Message message = session.createMessage();
+ message.setStringProperty("filter", filter ? "true" : "false");
+ producer.send(topic, message);
+ }
+
+ LOG.info("sent: " + filtered);
+ Thread.sleep(1 * 1000);
+ session.close();
+ con.close();
+
+ // restart broker
+ Thread.sleep(3 * 1000);
+ broker.stop();
+ createBroker(false /*deleteAllMessages*/);
+
+ // send more messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ producer = session.createProducer(null);
+
+ for (int i = 0; i < 10; i++) {
+ boolean filter = (int) (Math.random() * 2) >= 1;
+ if (filter)
+ filtered++;
+
+ Message message = session.createMessage();
+ message.setStringProperty("filter", filter ? "true" : "false");
+ producer.send(topic, message);
+ }
+
+ LOG.info("after restart, total sent with filter='true': " + filtered);
+ Thread.sleep(1 * 1000);
+ session.close();
+ con.close();
+
+ // test offline subs
+ con = createConnection("offCli1");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener("1>");
+ consumer.setMessageListener(listener);
+
+ Connection con3 = createConnection("offCli2");
+ Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ DurableSubscriptionOfflineTestListener listener3 = new DurableSubscriptionOfflineTestListener();
+ consumer3.setMessageListener(listener3);
+
+ Thread.sleep(3 * 1000);
+
+ session.close();
+ con.close();
+ session3.close();
+ con3.close();
+
+ assertEquals(filtered, listener.count);
+ assertEquals(filtered, listener3.count);
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testOfflineSubscriptionAfterRestart() throws Exception {
+ // create offline subs 1
+ Connection con = createConnection("offCli1");
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, false);
+ DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
+ consumer.setMessageListener(listener);
+
+ // send messages
+ MessageProducer producer = session.createProducer(null);
+
+ int sent = 0;
+ for (int i = 0; i < 10; i++) {
+ sent++;
+ Message message = session.createMessage();
+ message.setStringProperty("filter", "false");
+ producer.send(topic, message);
+ }
+
+ LOG.info("sent: " + sent);
+ Thread.sleep(5 * 1000);
+ session.close();
+ con.close();
+
+ assertEquals(sent, listener.count);
+
+ // restart broker
+ Thread.sleep(3 * 1000);
+ broker.stop();
+ createBroker(false /*deleteAllMessages*/);
+
+ // send more messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ producer = session.createProducer(null);
+
+ for (int i = 0; i < 10; i++) {
+ sent++;
+ Message message = session.createMessage();
+ message.setStringProperty("filter", "false");
+ producer.send(topic, message);
+ }
+
+ LOG.info("after restart, sent: " + sent);
+ Thread.sleep(1 * 1000);
+ session.close();
+ con.close();
+
+ // test offline subs
+ con = createConnection("offCli1");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+ consumer.setMessageListener(listener);
+
+ Thread.sleep(3 * 1000);
+
+ session.close();
+ con.close();
+
+ assertEquals(sent, listener.count);
+ }
+
+ public class FilterCheckListener extends DurableSubscriptionOfflineTestListener {
+
+ @Override
+ public void onMessage(Message message) {
+ count++;
+
+ try {
+ Object b = message.getObjectProperty("$b");
+ if (b != null) {
+ boolean c = message.getBooleanProperty("$c");
+ assertTrue("", c);
+ } else {
+ String d = message.getStringProperty("$d");
+ assertTrue("", "D1".equals(d) || "D2".equals(d));
+ }
+ }
+ catch (JMSException e) {
+ e.printStackTrace();
+ exceptions.add(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/57f5d49a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline4Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline4Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline4Test.java
new file mode 100644
index 0000000..09c50d0
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline4Test.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+
+@RunWith(value = Parameterized.class)
+public class DurableSubscriptionOffline4Test extends DurableSubscriptionOfflineTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOffline4Test.class);
+
+ @Parameterized.Parameters(name = "keepDurableSubsActive_{0}")
+ public static Collection<Boolean[]> getTestParameters() {
+ Boolean[] f = {Boolean.FALSE};
+ Boolean[] t = {Boolean.TRUE};
+ List<Boolean[]> booleanChoices = new ArrayList<Boolean[]>();
+ booleanChoices.add(f);
+ booleanChoices.add(t);
+
+ return booleanChoices;
+ }
+
+ public DurableSubscriptionOffline4Test(Boolean keepDurableSubsActive) {
+ this.journalMaxFileLength = 64 * 1024;
+ this.keepDurableSubsActive = keepDurableSubsActive.booleanValue();
+
+ LOG.info(">>>> running {} with keepDurableSubsActive: {}, journalMaxFileLength", testName.getMethodName(), this.keepDurableSubsActive, journalMaxFileLength);
+ }
+
+
+ @Test(timeout = 60 * 1000)
+ // https://issues.apache.org/jira/browse/AMQ-3206
+ public void testCleanupDeletedSubAfterRestart() throws Exception {
+ Connection con = createConnection("cli1");
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", null, true);
+ session.close();
+ con.close();
+
+ con = createConnection("cli2");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", null, true);
+ session.close();
+ con.close();
+
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(null);
+
+ final int toSend = 500;
+ final String payload = new byte[40*1024].toString();
+ int sent = 0;
+ for (int i = sent; i < toSend; i++) {
+ Message message = session.createTextMessage(payload);
+ message.setStringProperty("filter", "false");
+ message.setIntProperty("ID", i);
+ producer.send(topic, message);
+ sent++;
+ }
+ con.close();
+ LOG.info("sent: " + sent);
+
+ // kill off cli1
+ con = createConnection("cli1");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.unsubscribe("SubsId");
+
+ destroyBroker();
+ createBroker(false);
+
+ con = createConnection("cli2");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+ final DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
+ consumer.setMessageListener(listener);
+ assertTrue("got all sent", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ LOG.info("Want: " + toSend + ", current: " + listener.count);
+ return listener.count == toSend;
+ }
+ }));
+ session.close();
+ con.close();
+
+ destroyBroker();
+ createBroker(false);
+ final KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+ assertTrue("Should have less than three journal files left but was: " +
+ pa.getStore().getJournal().getFileMap().size(), Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return pa.getStore().getJournal().getFileMap().size() <= 3;
+ }
+ }));
+ }
+}
+