You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ke...@apache.org on 2013/12/19 14:37:21 UTC
[1/2] For AMQ-4874, broke into multiple parts,
and converted to use JUnit4 Parameterized instead of
CombinationTestSupport
Updated Branches:
refs/heads/trunk a64976a37 -> 57f5d49ae
http://git-wip-us.apache.org/repos/asf/activemq/blob/57f5d49a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
index 1473025..15c0627 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
@@ -16,50 +16,32 @@
*/
package org.apache.activemq.usecases;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.disk.page.PageFile;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
-import javax.management.ObjectName;
-
-import junit.framework.Test;
+import java.util.HashSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
-import org.apache.activemq.broker.jmx.TopicViewMBean;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.store.kahadb.disk.journal.Journal;
-import org.apache.activemq.store.kahadb.disk.page.PageFile;
-import org.apache.activemq.util.Wait;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.junit.Assert.*;
-public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupport {
+public class DurableSubscriptionOfflineTest extends DurableSubscriptionOfflineTestBase {
private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOfflineTest.class);
- public boolean usePrioritySupport = Boolean.TRUE;
- public int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
- public boolean keepDurableSubsActive = true;
- private BrokerService broker;
- private ActiveMQTopic topic;
- private final List<Throwable> exceptions = new ArrayList<Throwable>();
@Override
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
@@ -68,84 +50,8 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
return connectionFactory;
}
- @Override
- protected Connection createConnection() throws Exception {
- return createConnection("cliName");
- }
-
- protected Connection createConnection(String name) throws Exception {
- Connection con = super.createConnection();
- con.setClientID(name);
- con.start();
- return con;
- }
-
- public static Test suite() {
- return suite(DurableSubscriptionOfflineTest.class);
- }
-
- @Override
- protected void setUp() throws Exception {
- setAutoFail(true);
- setMaxTestTime(2 * 60 * 1000);
- exceptions.clear();
- topic = (ActiveMQTopic) createDestination();
- createBroker();
- super.setUp();
- }
-
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- destroyBroker();
- }
-
- private void createBroker() throws Exception {
- createBroker(true);
- }
-
- private void createBroker(boolean deleteAllMessages) throws Exception {
- broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) +")");
- broker.setBrokerName(getName(true));
- broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
- broker.getManagementContext().setCreateConnector(false);
- broker.setAdvisorySupport(false);
- broker.setKeepDurableSubsActive(keepDurableSubsActive);
- broker.addConnector("tcp://0.0.0.0:0");
-
- if (usePrioritySupport) {
- PolicyEntry policy = new PolicyEntry();
- policy.setPrioritizedMessages(true);
- PolicyMap policyMap = new PolicyMap();
- policyMap.setDefaultEntry(policy);
- broker.setDestinationPolicy(policyMap);
- }
-
- setDefaultPersistenceAdapter(broker);
- if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) {
- // ensure it kicks in during tests
- ((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).setCleanupPeriod(2*1000);
- } else if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
- // have lots of journal files
- ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setJournalMaxFileLength(journalMaxFileLength);
- }
- broker.start();
- broker.waitUntilStarted();
- }
-
- private void destroyBroker() throws Exception {
- if (broker != null)
- broker.stop();
- }
-
- public void initCombosForTestConsumeOnlyMatchedMessages() throws Exception {
- this.addCombinationValues("defaultPersistenceAdapter",
- new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
- this.addCombinationValues("usePrioritySupport",
- new Object[]{ Boolean.TRUE, Boolean.FALSE});
- }
-
- public void testConsumeOnlyMatchedMessages() throws Exception {
+ @Test(timeout = 60 * 1000)
+ public void testConsumeAllMatchedMessages() throws Exception {
// create durable subscription
Connection con = createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -160,15 +66,14 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
int sent = 0;
for (int i = 0; i < 10; i++) {
- boolean filter = i % 2 == 1;
- if (filter)
- sent++;
-
+ sent++;
Message message = session.createMessage();
- message.setStringProperty("filter", filter ? "true" : "false");
+ message.setStringProperty("filter", "true");
producer.send(topic, message);
}
+ Thread.sleep(1 * 1000);
+
session.close();
con.close();
@@ -176,7 +81,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
con = createConnection();
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- Listener listener = new Listener();
+ DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
consumer.setMessageListener(listener);
Thread.sleep(3 * 1000);
@@ -187,110 +92,8 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
assertEquals(sent, listener.count);
}
- public void testConsumeAllMatchedMessages() throws Exception {
- // create durable subscription
- Connection con = createConnection();
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- session.close();
- con.close();
-
- // send messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(null);
-
- int sent = 0;
- for (int i = 0; i < 10; i++) {
- sent++;
- Message message = session.createMessage();
- message.setStringProperty("filter", "true");
- producer.send(topic, message);
- }
-
- Thread.sleep(1 * 1000);
-
- session.close();
- con.close();
-
- // consume messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- Listener listener = new Listener();
- consumer.setMessageListener(listener);
-
- Thread.sleep(3 * 1000);
-
- session.close();
- con.close();
-
- assertEquals(sent, listener.count);
- }
-
- public void initCombosForTestVerifyAllConsumedAreAcked() throws Exception {
- this.addCombinationValues("defaultPersistenceAdapter",
- new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
- this.addCombinationValues("usePrioritySupport",
- new Object[]{ Boolean.TRUE, Boolean.FALSE});
- }
-
- public void testVerifyAllConsumedAreAcked() throws Exception {
- // create durable subscription
- Connection con = createConnection();
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- session.close();
- con.close();
-
- // send messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(null);
-
- int sent = 0;
- for (int i = 0; i < 10; i++) {
- sent++;
- Message message = session.createMessage();
- message.setStringProperty("filter", "true");
- producer.send(topic, message);
- }
-
- Thread.sleep(1 * 1000);
-
- session.close();
- con.close();
-
- // consume messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- Listener listener = new Listener();
- consumer.setMessageListener(listener);
-
- Thread.sleep(3 * 1000);
-
- session.close();
- con.close();
-
- LOG.info("Consumed: " + listener.count);
- assertEquals(sent, listener.count);
-
- // consume messages again, should not get any
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- listener = new Listener();
- consumer.setMessageListener(listener);
-
- Thread.sleep(3 * 1000);
-
- session.close();
- con.close();
-
- assertEquals(0, listener.count);
- }
+ @Test(timeout = 60 * 1000)
public void testTwoOfflineSubscriptionCanConsume() throws Exception {
// create durable subscription 1
Connection con = createConnection("cliId1");
@@ -303,7 +106,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
Connection con2 = createConnection("cliId2");
Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- Listener listener2 = new Listener();
+ DurableSubscriptionOfflineTestListener listener2 = new DurableSubscriptionOfflineTestListener();
consumer2.setMessageListener(listener2);
// send messages
@@ -334,273 +137,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
con = createConnection("cliId1");
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- Listener listener = new Listener();
- consumer.setMessageListener(listener);
-
- Thread.sleep(3 * 1000);
-
- session.close();
- con.close();
-
- assertEquals("offline consumer got all", sent, listener.count);
- }
-
- public void initCombosForTestJMXCountersWithOfflineSubs() throws Exception {
- this.addCombinationValues("keepDurableSubsActive",
- new Object[]{Boolean.TRUE, Boolean.FALSE});
- }
-
- public void testJMXCountersWithOfflineSubs() throws Exception {
- // create durable subscription 1
- Connection con = createConnection("cliId1");
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", null, true);
- session.close();
- con.close();
-
- // restart broker
- broker.stop();
- createBroker(false /*deleteAllMessages*/);
-
- // send messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(null);
-
- int sent = 0;
- for (int i = 0; i < 10; i++) {
- sent++;
- Message message = session.createMessage();
- producer.send(topic, message);
- }
- session.close();
- con.close();
-
- // consume some messages
- con = createConnection("cliId1");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
-
- for (int i=0; i<sent/2; i++) {
- Message m = consumer.receive(4000);
- assertNotNull("got message: " + i, m);
- LOG.info("Got :" + i + ", " + m);
- }
-
- // check some counters while active
- ObjectName activeDurableSubName = broker.getAdminView().getDurableTopicSubscribers()[0];
- LOG.info("active durable sub name: " + activeDurableSubName);
- final DurableSubscriptionViewMBean durableSubscriptionView = (DurableSubscriptionViewMBean)
- broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true);
-
- assertTrue("is active", durableSubscriptionView.isActive());
- assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView.getEnqueueCounter());
- assertTrue("correct waiting acks", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return 5 == durableSubscriptionView.getMessageCountAwaitingAcknowledge();
- }
- }));
- assertEquals("correct dequeue", 5, durableSubscriptionView.getDequeueCounter());
-
-
- ObjectName destinationName = broker.getAdminView().getTopics()[0];
- TopicViewMBean topicView = (TopicViewMBean) broker.getManagementContext().newProxyInstance(destinationName, TopicViewMBean.class, true);
- assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
- assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
- assertEquals("inflight", 5, topicView.getInFlightCount());
-
- session.close();
- con.close();
-
- // check some counters when inactive
- ObjectName inActiveDurableSubName = broker.getAdminView().getInactiveDurableTopicSubscribers()[0];
- LOG.info("inactive durable sub name: " + inActiveDurableSubName);
- DurableSubscriptionViewMBean durableSubscriptionView1 = (DurableSubscriptionViewMBean)
- broker.getManagementContext().newProxyInstance(inActiveDurableSubName, DurableSubscriptionViewMBean.class, true);
-
- assertTrue("is not active", !durableSubscriptionView1.isActive());
- assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView1.getEnqueueCounter());
- assertEquals("correct awaiting ack", 0, durableSubscriptionView1.getMessageCountAwaitingAcknowledge());
- assertEquals("correct dequeue", keepDurableSubsActive ? 5 : 0, durableSubscriptionView1.getDequeueCounter());
-
- // destination view
- assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
- assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
- assertEquals("inflight back to 0 after deactivate", 0, topicView.getInFlightCount());
-
- // consume the rest
- con = createConnection("cliId1");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
-
- for (int i=0; i<sent/2;i++) {
- Message m = consumer.receive(30000);
- assertNotNull("got message: " + i, m);
- LOG.info("Got :" + i + ", " + m);
- }
-
- activeDurableSubName = broker.getAdminView().getDurableTopicSubscribers()[0];
- LOG.info("durable sub name: " + activeDurableSubName);
- final DurableSubscriptionViewMBean durableSubscriptionView2 = (DurableSubscriptionViewMBean)
- broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true);
-
- assertTrue("is active", durableSubscriptionView2.isActive());
- assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView2.getEnqueueCounter());
- assertTrue("correct dequeue", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- long val = durableSubscriptionView2.getDequeueCounter();
- LOG.info("dequeue count:" + val);
- return 10 == val;
- }
- }));
- }
-
- public void initCombosForTestOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
- this.addCombinationValues("defaultPersistenceAdapter",
- new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
- this.addCombinationValues("usePrioritySupport",
- new Object[]{ Boolean.TRUE, Boolean.FALSE});
- }
-
- public void testOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
- Connection con = createConnection("offCli1");
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- session.close();
- con.close();
-
- con = createConnection("offCli2");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- session.close();
- con.close();
-
- Connection con2 = createConnection("onlineCli1");
- Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- Listener listener2 = new Listener();
- consumer2.setMessageListener(listener2);
-
- // send messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(null);
-
- int sent = 0;
- for (int i = 0; i < 10; i++) {
- sent++;
- Message message = session.createMessage();
- message.setStringProperty("filter", "true");
- producer.send(topic, message);
- }
-
- Thread.sleep(1 * 1000);
- session.close();
- con.close();
-
- // test online subs
- Thread.sleep(3 * 1000);
- session2.close();
- con2.close();
- assertEquals(sent, listener2.count);
-
- // restart broker
- broker.stop();
- createBroker(false /*deleteAllMessages*/);
-
- // test offline
- con = createConnection("offCli1");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-
- Connection con3 = createConnection("offCli2");
- Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-
- Listener listener = new Listener();
- consumer.setMessageListener(listener);
- Listener listener3 = new Listener();
- consumer3.setMessageListener(listener3);
-
- Thread.sleep(3 * 1000);
-
- session.close();
- con.close();
- session3.close();
- con3.close();
-
- assertEquals(sent, listener.count);
- assertEquals(sent, listener3.count);
- }
-
- public void initCombosForTestInterleavedOfflineSubscriptionCanConsume() throws Exception {
- this.addCombinationValues("defaultPersistenceAdapter",
- new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
- }
-
- public void testInterleavedOfflineSubscriptionCanConsume() throws Exception {
- // create durable subscription 1
- Connection con = createConnection("cliId1");
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- session.close();
- con.close();
-
- // send messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(null);
-
- int sent = 0;
- for (int i = 0; i < 10; i++) {
- sent++;
- Message message = session.createMessage();
- message.setStringProperty("filter", "true");
- producer.send(topic, message);
- }
-
- Thread.sleep(1 * 1000);
-
- // create durable subscription 2
- Connection con2 = createConnection("cliId2");
- Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- Listener listener2 = new Listener();
- consumer2.setMessageListener(listener2);
-
- assertEquals(0, listener2.count);
- session2.close();
- con2.close();
-
- // send some more
- for (int i = 0; i < 10; i++) {
- sent++;
- Message message = session.createMessage();
- message.setStringProperty("filter", "true");
- producer.send(topic, message);
- }
-
- Thread.sleep(1 * 1000);
- session.close();
- con.close();
-
- con2 = createConnection("cliId2");
- session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- listener2 = new Listener("cliId2");
- consumer2.setMessageListener(listener2);
- // test online subs
- Thread.sleep(3 * 1000);
-
- assertEquals(10, listener2.count);
-
- // consume all messages
- con = createConnection("cliId1");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- Listener listener = new Listener("cliId1");
+ DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
consumer.setMessageListener(listener);
Thread.sleep(3 * 1000);
@@ -611,117 +148,9 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
assertEquals("offline consumer got all", sent, listener.count);
}
- public void initCombosForTestMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
- this.addCombinationValues("defaultPersistenceAdapter",
- new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
- }
-
- private static String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))";
- public void testMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
- // create offline subs 1
- Connection con = createConnection("offCli1");
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", filter, true);
- session.close();
- con.close();
-
- // create offline subs 2
- con = createConnection("offCli2");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", filter, true);
- session.close();
- con.close();
-
- // create online subs
- Connection con2 = createConnection("onlineCli1");
- Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", filter, true);
- Listener listener2 = new Listener();
- consumer2.setMessageListener(listener2);
-
- // create non-durable consumer
- Connection con4 = createConnection("nondurableCli");
- Session session4 = con4.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer4 = session4.createConsumer(topic, filter, true);
- Listener listener4 = new Listener();
- consumer4.setMessageListener(listener4);
-
- // send messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(null);
-
- boolean hasRelevant = false;
- int filtered = 0;
- for (int i = 0; i < 100; i++) {
- int postf = (int) (Math.random() * 9) + 1;
- String d = "D" + postf;
-
- if ("D1".equals(d) || "D2".equals(d)) {
- hasRelevant = true;
- filtered++;
- }
-
- Message message = session.createMessage();
- message.setStringProperty("$a", "A1");
- message.setStringProperty("$d", d);
- producer.send(topic, message);
- }
-
- Message message = session.createMessage();
- message.setStringProperty("$a", "A1");
- message.setBooleanProperty("$b", true);
- message.setBooleanProperty("$c", hasRelevant);
- producer.send(topic, message);
-
- if (hasRelevant)
- filtered++;
-
- Thread.sleep(1 * 1000);
- session.close();
- con.close();
-
- Thread.sleep(3 * 1000);
-
- // test non-durable consumer
- session4.close();
- con4.close();
- assertEquals(filtered, listener4.count); // succeeded!
-
- // test online subs
- session2.close();
- con2.close();
- assertEquals(filtered, listener2.count); // succeeded!
-
- // test offline 1
- con = createConnection("offCli1");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
- Listener listener = new FilterCheckListener();
- consumer.setMessageListener(listener);
-
- Thread.sleep(3 * 1000);
- session.close();
- con.close();
-
- assertEquals(filtered, listener.count);
-
- // test offline 2
- Connection con3 = createConnection("offCli2");
- Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", filter, true);
- Listener listener3 = new FilterCheckListener();
- consumer3.setMessageListener(listener3);
-
- Thread.sleep(3 * 1000);
- session3.close();
- con3.close();
-
- assertEquals(filtered, listener3.count);
- assertTrue("no unexpected exceptions: " + exceptions, exceptions.isEmpty());
- }
-
+ @Test(timeout = 60 * 1000)
public void testRemovedDurableSubDeletes() throws Exception {
+ String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))";
// create durable subscription 1
Connection con = createConnection("cliId1");
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -753,13 +182,14 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
con = createConnection("offCli2");
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
- Listener listener = new Listener();
+ DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
consumer.setMessageListener(listener);
session.close();
con.close();
assertEquals(0, listener.count);
}
+ @Test(timeout = 60 * 1000)
public void testRemovedDurableSubDeletesFromIndex() throws Exception {
if (! (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter)) {
@@ -810,169 +240,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
}
}
- public void initCombosForTestOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
- this.addCombinationValues("defaultPersistenceAdapter",
- new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
- }
-
- public void testOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
-
- if (PersistenceAdapterChoice.LevelDB == defaultPersistenceAdapter) {
- // https://issues.apache.org/jira/browse/AMQ-4296
- return;
- }
-
- // create offline subs 1
- Connection con = createConnection("offCli1");
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- session.close();
- con.close();
-
- // create offline subs 2
- con = createConnection("offCli2");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- session.close();
- con.close();
-
- // send messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(null);
-
- int filtered = 0;
- for (int i = 0; i < 10; i++) {
- boolean filter = (int) (Math.random() * 2) >= 1;
- if (filter)
- filtered++;
-
- Message message = session.createMessage();
- message.setStringProperty("filter", filter ? "true" : "false");
- producer.send(topic, message);
- }
-
- LOG.info("sent: " + filtered);
- Thread.sleep(1 * 1000);
- session.close();
- con.close();
-
- // restart broker
- Thread.sleep(3 * 1000);
- broker.stop();
- createBroker(false /*deleteAllMessages*/);
-
- // send more messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- producer = session.createProducer(null);
-
- for (int i = 0; i < 10; i++) {
- boolean filter = (int) (Math.random() * 2) >= 1;
- if (filter)
- filtered++;
-
- Message message = session.createMessage();
- message.setStringProperty("filter", filter ? "true" : "false");
- producer.send(topic, message);
- }
-
- LOG.info("after restart, total sent with filter='true': " + filtered);
- Thread.sleep(1 * 1000);
- session.close();
- con.close();
-
- // test offline subs
- con = createConnection("offCli1");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- Listener listener = new Listener("1>");
- consumer.setMessageListener(listener);
-
- Connection con3 = createConnection("offCli2");
- Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- Listener listener3 = new Listener();
- consumer3.setMessageListener(listener3);
-
- Thread.sleep(3 * 1000);
-
- session.close();
- con.close();
- session3.close();
- con3.close();
-
- assertEquals(filtered, listener.count);
- assertEquals(filtered, listener3.count);
- }
-
- public void initCombosForTestOfflineSubscriptionAfterRestart() throws Exception {
- this.addCombinationValues("defaultPersistenceAdapter",
- new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
- }
-
- public void testOfflineSubscriptionAfterRestart() throws Exception {
- // create offline subs 1
- Connection con = createConnection("offCli1");
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, false);
- Listener listener = new Listener();
- consumer.setMessageListener(listener);
-
- // send messages
- MessageProducer producer = session.createProducer(null);
-
- int sent = 0;
- for (int i = 0; i < 10; i++) {
- sent++;
- Message message = session.createMessage();
- message.setStringProperty("filter", "false");
- producer.send(topic, message);
- }
-
- LOG.info("sent: " + sent);
- Thread.sleep(5 * 1000);
- session.close();
- con.close();
-
- assertEquals(sent, listener.count);
-
- // restart broker
- Thread.sleep(3 * 1000);
- broker.stop();
- createBroker(false /*deleteAllMessages*/);
-
- // send more messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- producer = session.createProducer(null);
-
- for (int i = 0; i < 10; i++) {
- sent++;
- Message message = session.createMessage();
- message.setStringProperty("filter", "false");
- producer.send(topic, message);
- }
-
- LOG.info("after restart, sent: " + sent);
- Thread.sleep(1 * 1000);
- session.close();
- con.close();
-
- // test offline subs
- con = createConnection("offCli1");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
- consumer.setMessageListener(listener);
-
- Thread.sleep(3 * 1000);
-
- session.close();
- con.close();
-
- assertEquals(sent, listener.count);
- }
-
+ @Test(timeout = 60 * 1000)
public void testInterleavedOfflineSubscriptionCanConsumeAfterUnsub() throws Exception {
// create offline subs 1
Connection con = createConnection("offCli1");
@@ -1016,7 +284,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
con = createConnection("offCli2");
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
- Listener listener = new Listener("SubsId");
+ DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener("SubsId");
consumer.setMessageListener(listener);
Thread.sleep(3 * 1000);
@@ -1027,6 +295,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
assertEquals("offline consumer got all", sent, listener.count);
}
+ @Test(timeout = 60 * 1000)
public void testNoDuplicateOnConcurrentSendTranCommitAndActivate() throws Exception {
final int messageCount = 1000;
Connection con = null;
@@ -1117,10 +386,9 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
}
- /*
- * Ignoring for now, see https://issues.apache.org/jira/browse/AMQ-4874
- */
- public void XXXtestOrderOnActivateDeactivate() throws Exception {
+ @Ignore("see https://issues.apache.org/jira/browse/AMQ-4874")
+ @Test(timeout = 60 * 1000)
+ public void testOrderOnActivateDeactivate() throws Exception {
for (int i=0;i<10;i++) {
LOG.info("Iteration: " + i);
doTestOrderOnActivateDeactivate();
@@ -1145,13 +413,13 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
}
final String url = "failover:(tcp://localhost:"
- + (broker.getTransportConnectors().get(1).getConnectUri()).getPort()
- + "?wireFormat.maxInactivityDuration=0)?"
- + "jms.watchTopicAdvisories=false&"
- + "jms.alwaysSyncSend=true&jms.dispatchAsync=true&"
- + "jms.sendAcksAsync=true&"
- + "initialReconnectDelay=100&maxReconnectDelay=30000&"
- + "useExponentialBackOff=true";
+ + (broker.getTransportConnectors().get(1).getConnectUri()).getPort()
+ + "?wireFormat.maxInactivityDuration=0)?"
+ + "jms.watchTopicAdvisories=false&"
+ + "jms.alwaysSyncSend=true&jms.dispatchAsync=true&"
+ + "jms.sendAcksAsync=true&"
+ + "initialReconnectDelay=100&maxReconnectDelay=30000&"
+ + "useExponentialBackOff=true";
final ActiveMQConnectionFactory clientFactory = new ActiveMQConnectionFactory(url);
class CheckOrderClient implements Runnable {
@@ -1235,6 +503,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
}
+ @Test(timeout = 60 * 1000)
public void testUnmatchedSubUnsubscribeDeletesAll() throws Exception {
// create offline subs 1
Connection con = createConnection("offCli1");
@@ -1274,7 +543,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
con = createConnection("offCli1");
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- Listener listener = new Listener();
+ DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
consumer.setMessageListener(listener);
Thread.sleep(3 * 1000);
@@ -1285,6 +554,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
assertEquals(0, listener.count);
}
+ @Test(timeout = 60 * 1000)
public void testAllConsumed() throws Exception {
final String filter = "filter = 'true'";
Connection con = createConnection("cli1");
@@ -1320,7 +590,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
con = createConnection("cli1");
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
- Listener listener = new Listener();
+ DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
consumer.setMessageListener(listener);
Thread.sleep(3 * 1000);
session.close();
@@ -1358,7 +628,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
con = createConnection("cli1");
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
- listener = new Listener();
+ listener = new DurableSubscriptionOfflineTestListener();
consumer.setMessageListener(listener);
Thread.sleep(3 * 1000);
session.close();
@@ -1368,6 +638,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
}
// https://issues.apache.org/jira/browse/AMQ-3190
+ @Test(timeout = 60 * 1000)
public void testNoMissOnMatchingSubAfterRestart() throws Exception {
final String filter = "filter = 'true'";
@@ -1447,80 +718,6 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
con.close();
}
- // use very small journal to get lots of files to cleanup
- public void initCombosForTestCleanupDeletedSubAfterRestart() throws Exception {
- this.addCombinationValues("journalMaxFileLength",
- new Object[]{new Integer(64 * 1024)});
- this.addCombinationValues("keepDurableSubsActive",
- new Object[]{Boolean.TRUE, Boolean.FALSE});
- }
-
- // https://issues.apache.org/jira/browse/AMQ-3206
- public void testCleanupDeletedSubAfterRestart() throws Exception {
- Connection con = createConnection("cli1");
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", null, true);
- session.close();
- con.close();
-
- con = createConnection("cli2");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", null, true);
- session.close();
- con.close();
-
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(null);
-
- final int toSend = 500;
- final String payload = new byte[40*1024].toString();
- int sent = 0;
- for (int i = sent; i < toSend; i++) {
- Message message = session.createTextMessage(payload);
- message.setStringProperty("filter", "false");
- message.setIntProperty("ID", i);
- producer.send(topic, message);
- sent++;
- }
- con.close();
- LOG.info("sent: " + sent);
-
- // kill off cli1
- con = createConnection("cli1");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.unsubscribe("SubsId");
-
- destroyBroker();
- createBroker(false);
-
- con = createConnection("cli2");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
- final Listener listener = new Listener();
- consumer.setMessageListener(listener);
- assertTrue("got all sent", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- LOG.info("Want: " + toSend + ", current: " + listener.count);
- return listener.count == toSend;
- }
- }));
- session.close();
- con.close();
-
- destroyBroker();
- createBroker(false);
- final KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
- assertTrue("Should have less than three journal files left but was: " +
- pa.getStore().getJournal().getFileMap().size(), Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return pa.getStore().getJournal().getFileMap().size() <= 3;
- }
- }));
- }
// // https://issues.apache.org/jira/browse/AMQ-3768
// public void testPageReuse() throws Exception {
@@ -1679,46 +876,4 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
// assertTrue("No exceptions expected, but was: " + exceptions, exceptions.isEmpty());
// }
- public static class Listener implements MessageListener {
- int count = 0;
- String id = null;
-
- Listener() {
- }
- Listener(String id) {
- this.id = id;
- }
- @Override
- public void onMessage(Message message) {
- count++;
- if (id != null) {
- try {
- LOG.info(id + ", " + message.getJMSMessageID());
- } catch (Exception ignored) {}
- }
- }
- }
-
- public class FilterCheckListener extends Listener {
-
- @Override
- public void onMessage(Message message) {
- count++;
-
- try {
- Object b = message.getObjectProperty("$b");
- if (b != null) {
- boolean c = message.getBooleanProperty("$c");
- assertTrue("", c);
- } else {
- String d = message.getStringProperty("$d");
- assertTrue("", "D1".equals(d) || "D2".equals(d));
- }
- }
- catch (JMSException e) {
- e.printStackTrace();
- exceptions.add(e);
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/57f5d49a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTestBase.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTestBase.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTestBase.java
new file mode 100644
index 0000000..3bffb4e
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTestBase.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.TestSupport.PersistenceAdapterChoice;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.disk.journal.Journal;
+import org.apache.activemq.store.leveldb.LevelDBPersistenceAdapter;
+import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageListener;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public abstract class DurableSubscriptionOfflineTestBase {
+ private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOfflineTestBase.class);
+ public boolean usePrioritySupport = Boolean.TRUE;
+ public int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
+ public boolean keepDurableSubsActive = true;
+ protected BrokerService broker;
+ protected ActiveMQTopic topic;
+ protected final List<Throwable> exceptions = new ArrayList<Throwable>();
+ protected ActiveMQConnectionFactory connectionFactory;
+ protected boolean isTopic = true;
+ public PersistenceAdapterChoice defaultPersistenceAdapter = PersistenceAdapterChoice.KahaDB;
+
+ @Rule
+ public TestName testName = new TestName();
+
+ protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true));
+ connectionFactory.setWatchTopicAdvisories(false);
+ return connectionFactory;
+ }
+
+ protected Connection createConnection() throws Exception {
+ return createConnection("cliName");
+ }
+
+ protected Connection createConnection(String name) throws Exception {
+ ConnectionFactory connectionFactory1 = createConnectionFactory();
+ Connection connection = connectionFactory1.createConnection();
+ connection.setClientID(name);
+ connection.start();
+ return connection;
+ }
+
+ public ActiveMQConnectionFactory getConnectionFactory() throws Exception {
+ if (connectionFactory == null) {
+ connectionFactory = createConnectionFactory();
+ assertTrue("Should have created a connection factory!", connectionFactory != null);
+ }
+ return connectionFactory;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ exceptions.clear();
+ topic = (ActiveMQTopic) createDestination();
+ createBroker();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ destroyBroker();
+ }
+
+ protected void createBroker() throws Exception {
+ createBroker(true);
+ }
+
+ protected void createBroker(boolean deleteAllMessages) throws Exception {
+ String currentTestName = getName(true);
+ broker = BrokerFactory.createBroker("broker:(vm://" + currentTestName +")");
+ broker.setBrokerName(currentTestName);
+ broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+ broker.getManagementContext().setCreateConnector(false);
+ broker.setAdvisorySupport(false);
+ broker.setKeepDurableSubsActive(keepDurableSubsActive);
+ broker.addConnector("tcp://0.0.0.0:0");
+
+ if (usePrioritySupport) {
+ PolicyEntry policy = new PolicyEntry();
+ policy.setPrioritizedMessages(true);
+ PolicyMap policyMap = new PolicyMap();
+ policyMap.setDefaultEntry(policy);
+ broker.setDestinationPolicy(policyMap);
+ }
+
+ setDefaultPersistenceAdapter(broker);
+ if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) {
+ // ensure it kicks in during tests
+ ((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).setCleanupPeriod(2*1000);
+ } else if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
+ // have lots of journal files
+ ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setJournalMaxFileLength(journalMaxFileLength);
+ }
+ broker.start();
+ broker.waitUntilStarted();
+ }
+
+ protected void destroyBroker() throws Exception {
+ if (broker != null)
+ broker.stop();
+ }
+
+ protected Destination createDestination(String subject) {
+ if (isTopic) {
+ return new ActiveMQTopic(subject);
+ } else {
+ return new ActiveMQQueue(subject);
+ }
+ }
+
+ protected Destination createDestination() {
+ return createDestination(getDestinationString());
+ }
+
+ /**
+ * Returns the name of the destination used in this test case
+ */
+ protected String getDestinationString() {
+ return getClass().getName() + "." + getName(true);
+ }
+
+
+ public String getName() {
+ return getName(false);
+ }
+
+ protected String getName(boolean original) {
+ String currentTestName = testName.getMethodName();
+ currentTestName = currentTestName.replace("[","");
+ currentTestName = currentTestName.replace("]","");
+ return currentTestName;
+ }
+
+ public PersistenceAdapter setDefaultPersistenceAdapter(BrokerService broker) throws IOException {
+ return setPersistenceAdapter(broker, defaultPersistenceAdapter);
+ }
+
+ public PersistenceAdapter setPersistenceAdapter(BrokerService broker, PersistenceAdapterChoice choice) throws IOException {
+ PersistenceAdapter adapter = null;
+ switch (choice) {
+ case JDBC:
+ LOG.debug(">>>> setPersistenceAdapter to JDBC ");
+ adapter = new JDBCPersistenceAdapter();
+ break;
+ case KahaDB:
+ LOG.debug(">>>> setPersistenceAdapter to KahaDB ");
+ adapter = new KahaDBPersistenceAdapter();
+ break;
+ case LevelDB:
+ LOG.debug(">>>> setPersistenceAdapter to LevelDB ");
+ adapter = new LevelDBPersistenceAdapter();
+ break;
+ case MEM:
+ LOG.debug(">>>> setPersistenceAdapter to MEM ");
+ adapter = new MemoryPersistenceAdapter();
+ break;
+ }
+ broker.setPersistenceAdapter(adapter);
+ return adapter;
+ }
+}
+
+class DurableSubscriptionOfflineTestListener implements MessageListener {
+ private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOfflineTestListener.class);
+ int count = 0;
+ String id = null;
+
+ DurableSubscriptionOfflineTestListener() {}
+
+ DurableSubscriptionOfflineTestListener(String id) {
+ this.id = id;
+ }
+ @Override
+ public void onMessage(javax.jms.Message message) {
+ count++;
+ if (id != null) {
+ try {
+ LOG.info(id + ", " + message.getJMSMessageID());
+ } catch (Exception ignored) {}
+ }
+ }
+}
\ No newline at end of file
[2/2] git commit: For AMQ-4874, broke into multiple parts,
and converted to use JUnit4 Parameterized instead of
CombinationTestSupport
Posted by ke...@apache.org.
For AMQ-4874, broke into multiple parts, and converted to use JUnit4 Parameterized instead of CombinationTestSupport
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/57f5d49a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/57f5d49a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/57f5d49a
Branch: refs/heads/trunk
Commit: 57f5d49ae9a76f2a3a17fa7c7966130c7a7352fe
Parents: a64976a
Author: Kevin Earls <ke...@kevinearls.com>
Authored: Thu Dec 19 14:37:12 2013 +0100
Committer: Kevin Earls <ke...@kevinearls.com>
Committed: Thu Dec 19 14:37:12 2013 +0100
----------------------------------------------------------------------
.../DurableSubscriptionOffline1Test.java | 248 +++++
.../DurableSubscriptionOffline2Test.java | 171 ++++
.../DurableSubscriptionOffline3Test.java | 424 +++++++++
.../DurableSubscriptionOffline4Test.java | 131 +++
.../DurableSubscriptionOfflineTest.java | 941 +------------------
.../DurableSubscriptionOfflineTestBase.java | 221 +++++
6 files changed, 1243 insertions(+), 893 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/57f5d49a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline1Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline1Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline1Test.java
new file mode 100644
index 0000000..67745f9
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline1Test.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.TestSupport.PersistenceAdapterChoice;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.actors.threadpool.Arrays;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(value = Parameterized.class)
+public class DurableSubscriptionOffline1Test extends DurableSubscriptionOfflineTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOffline1Test.class);
+
+ @Parameterized.Parameters(name = "{0}-{1}")
+ public static Collection<Object[]> getTestParameters() {
+ String osName = System.getProperty("os.name");
+ LOG.debug("Running on [" + osName + "]");
+
+ List<PersistenceAdapterChoice> persistenceAdapterChoices = new ArrayList<PersistenceAdapterChoice>();
+
+ persistenceAdapterChoices.add(PersistenceAdapterChoice.KahaDB);
+ persistenceAdapterChoices.add(PersistenceAdapterChoice.JDBC);
+ if (!osName.equalsIgnoreCase("AIX") && !osName.equalsIgnoreCase("SunOS")) {
+ //choices.add(levelDb);
+ persistenceAdapterChoices.add(PersistenceAdapterChoice.LevelDB);
+ }
+
+ List<Object[]> testParameters = new ArrayList<Object[]>();
+ Boolean[] booleanValues = {Boolean.FALSE, Boolean.TRUE};
+ List<Boolean> booleans = Arrays.asList(booleanValues);
+ for (Boolean booleanValue : booleans) {
+ for (PersistenceAdapterChoice persistenceAdapterChoice : persistenceAdapterChoices) {
+ Object[] currentChoice = {persistenceAdapterChoice, booleanValue};
+ testParameters.add(currentChoice);
+ }
+ }
+
+ return testParameters;
+ }
+
+ public DurableSubscriptionOffline1Test(PersistenceAdapterChoice adapter, Boolean usePrioritySupport) {
+ this.defaultPersistenceAdapter = adapter;
+ this.usePrioritySupport = usePrioritySupport.booleanValue();
+ LOG.debug(">>>> Created with adapter {} usePrioritySupport? {}", defaultPersistenceAdapter, usePrioritySupport);
+
+ }
+
+ @Test
+ public void testConsumeOnlyMatchedMessages() throws Exception {
+ // create durable subscription
+ Connection con = createConnection();
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ session.close();
+ con.close();
+
+ // send messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(null);
+
+ int sent = 0;
+ for (int i = 0; i < 10; i++) {
+ boolean filter = i % 2 == 1;
+ if (filter)
+ sent++;
+
+ Message message = session.createMessage();
+ message.setStringProperty("filter", filter ? "true" : "false");
+ producer.send(topic, message);
+ }
+
+ session.close();
+ con.close();
+
+ // consume messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
+ consumer.setMessageListener(listener);
+
+ Thread.sleep(3 * 1000);
+
+ session.close();
+ con.close();
+
+ assertEquals(sent, listener.count);
+ }
+
+ @Test
+ public void testVerifyAllConsumedAreAcked() throws Exception {
+ // create durable subscription
+ Connection con = createConnection();
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ session.close();
+ con.close();
+
+ // send messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(null);
+
+ int sent = 0;
+ for (int i = 0; i < 10; i++) {
+ sent++;
+ Message message = session.createMessage();
+ message.setStringProperty("filter", "true");
+ producer.send(topic, message);
+ }
+
+ Thread.sleep(1 * 1000);
+
+ session.close();
+ con.close();
+
+ // consume messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
+ consumer.setMessageListener(listener);
+
+ Thread.sleep(3 * 1000);
+
+ session.close();
+ con.close();
+
+ LOG.info("Consumed: " + listener.count);
+ assertEquals(sent, listener.count);
+
+ // consume messages again, should not get any
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ listener = new DurableSubscriptionOfflineTestListener();
+ consumer.setMessageListener(listener);
+
+ Thread.sleep(3 * 1000);
+
+ session.close();
+ con.close();
+
+ assertEquals(0, listener.count);
+ }
+
+ @Test
+ public void testOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
+ Connection con = createConnection("offCli1");
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ session.close();
+ con.close();
+
+ con = createConnection("offCli2");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ session.close();
+ con.close();
+
+ Connection con2 = createConnection("onlineCli1");
+ Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ DurableSubscriptionOfflineTestListener listener2 = new DurableSubscriptionOfflineTestListener();
+ consumer2.setMessageListener(listener2);
+
+ // send messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(null);
+
+ int sent = 0;
+ for (int i = 0; i < 10; i++) {
+ sent++;
+ Message message = session.createMessage();
+ message.setStringProperty("filter", "true");
+ producer.send(topic, message);
+ }
+
+ Thread.sleep(1 * 1000);
+ session.close();
+ con.close();
+
+ // test online subs
+ Thread.sleep(3 * 1000);
+ session2.close();
+ con2.close();
+ assertEquals(sent, listener2.count);
+
+ // restart broker
+ broker.stop();
+ createBroker(false /*deleteAllMessages*/);
+
+ // test offline
+ con = createConnection("offCli1");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+
+ Connection con3 = createConnection("offCli2");
+ Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+
+ DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
+ consumer.setMessageListener(listener);
+ DurableSubscriptionOfflineTestListener listener3 = new DurableSubscriptionOfflineTestListener();
+ consumer3.setMessageListener(listener3);
+
+ Thread.sleep(3 * 1000);
+
+ session.close();
+ con.close();
+ session3.close();
+ con3.close();
+
+ assertEquals(sent, listener.count);
+ assertEquals(sent, listener3.count);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/57f5d49a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java
new file mode 100644
index 0000000..960d9ea
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
+import org.apache.activemq.broker.jmx.TopicViewMBean;
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.ObjectName;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+
+@RunWith(value = Parameterized.class)
+public class DurableSubscriptionOffline2Test extends DurableSubscriptionOfflineTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOffline2Test.class);
+
+ @Parameterized.Parameters(name = "{0}")
+ public static Collection<Boolean[]> getTestParameters() {
+ Boolean[] f = {Boolean.FALSE};
+ Boolean[] t = {Boolean.TRUE};
+ List<Boolean[]> booleanChoices = new ArrayList<Boolean[]>();
+ booleanChoices.add(f);
+ booleanChoices.add(t);
+
+ return booleanChoices;
+ }
+
+ public DurableSubscriptionOffline2Test(Boolean keepDurableSubsActive) {
+ this.keepDurableSubsActive = keepDurableSubsActive.booleanValue();
+
+ LOG.info(">>>> running {} with keepDurableSubsActive: {}", testName.getMethodName(), this.keepDurableSubsActive);
+ }
+
+
+ @Test(timeout = 60 * 1000)
+ public void testJMXCountersWithOfflineSubs() throws Exception {
+ // create durable subscription 1
+ Connection con = createConnection("cliId1");
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", null, true);
+ session.close();
+ con.close();
+
+ // restart broker
+ broker.stop();
+ createBroker(false /*deleteAllMessages*/);
+
+ // send messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(null);
+
+ int sent = 0;
+ for (int i = 0; i < 10; i++) {
+ sent++;
+ Message message = session.createMessage();
+ producer.send(topic, message);
+ }
+ session.close();
+ con.close();
+
+ // consume some messages
+ con = createConnection("cliId1");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+
+ for (int i=0; i<sent/2; i++) {
+ Message m = consumer.receive(4000);
+ assertNotNull("got message: " + i, m);
+ LOG.info("Got :" + i + ", " + m);
+ }
+
+ // check some counters while active
+ ObjectName activeDurableSubName = broker.getAdminView().getDurableTopicSubscribers()[0];
+ LOG.info("active durable sub name: " + activeDurableSubName);
+ final DurableSubscriptionViewMBean durableSubscriptionView = (DurableSubscriptionViewMBean)
+ broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true);
+
+ assertTrue("is active", durableSubscriptionView.isActive());
+ assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView.getEnqueueCounter());
+ assertTrue("correct waiting acks", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return 5 == durableSubscriptionView.getMessageCountAwaitingAcknowledge();
+ }
+ }));
+ assertEquals("correct dequeue", 5, durableSubscriptionView.getDequeueCounter());
+
+
+ ObjectName destinationName = broker.getAdminView().getTopics()[0];
+ TopicViewMBean topicView = (TopicViewMBean) broker.getManagementContext().newProxyInstance(destinationName, TopicViewMBean.class, true);
+ assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
+ assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
+ assertEquals("inflight", 5, topicView.getInFlightCount());
+
+ session.close();
+ con.close();
+
+ // check some counters when inactive
+ ObjectName inActiveDurableSubName = broker.getAdminView().getInactiveDurableTopicSubscribers()[0];
+ LOG.info("inactive durable sub name: " + inActiveDurableSubName);
+ DurableSubscriptionViewMBean durableSubscriptionView1 = (DurableSubscriptionViewMBean)
+ broker.getManagementContext().newProxyInstance(inActiveDurableSubName, DurableSubscriptionViewMBean.class, true);
+
+ assertTrue("is not active", !durableSubscriptionView1.isActive());
+ assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView1.getEnqueueCounter());
+ assertEquals("correct awaiting ack", 0, durableSubscriptionView1.getMessageCountAwaitingAcknowledge());
+ assertEquals("correct dequeue", keepDurableSubsActive ? 5 : 0, durableSubscriptionView1.getDequeueCounter());
+
+ // destination view
+ assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
+ assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
+ assertEquals("inflight back to 0 after deactivate", 0, topicView.getInFlightCount());
+
+ // consume the rest
+ con = createConnection("cliId1");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+
+ for (int i=0; i<sent/2;i++) {
+ Message m = consumer.receive(30000);
+ assertNotNull("got message: " + i, m);
+ LOG.info("Got :" + i + ", " + m);
+ }
+
+ activeDurableSubName = broker.getAdminView().getDurableTopicSubscribers()[0];
+ LOG.info("durable sub name: " + activeDurableSubName);
+ final DurableSubscriptionViewMBean durableSubscriptionView2 = (DurableSubscriptionViewMBean)
+ broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true);
+
+ assertTrue("is active", durableSubscriptionView2.isActive());
+ assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView2.getEnqueueCounter());
+ assertTrue("correct dequeue", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ long val = durableSubscriptionView2.getDequeueCounter();
+ LOG.info("dequeue count:" + val);
+ return 10 == val;
+ }
+ }));
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/57f5d49a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline3Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline3Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline3Test.java
new file mode 100644
index 0000000..c0aee13
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline3Test.java
@@ -0,0 +1,424 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.TestSupport.PersistenceAdapterChoice;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+@RunWith(value = Parameterized.class)
+public class DurableSubscriptionOffline3Test extends DurableSubscriptionOfflineTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOffline3Test.class);
+
+ @Parameterized.Parameters(name = "{0}")
+ public static Collection<PersistenceAdapterChoice[]> getTestParameters() {
+ String osName = System.getProperty("os.name");
+ LOG.debug("Running on [" + osName + "]");
+
+ PersistenceAdapterChoice[] kahaDb = {PersistenceAdapterChoice.KahaDB};
+ PersistenceAdapterChoice[] jdbc = {PersistenceAdapterChoice.JDBC};
+ List<PersistenceAdapterChoice[]> choices = new ArrayList<PersistenceAdapterChoice[]>();
+ choices.add(kahaDb);
+ choices.add(jdbc);
+ if (!osName.equalsIgnoreCase("AIX") && !osName.equalsIgnoreCase("SunOS")) {
+ PersistenceAdapterChoice[] levelDb = {PersistenceAdapterChoice.LevelDB};
+ choices.add(levelDb);
+ }
+
+ return choices;
+ }
+
+ public DurableSubscriptionOffline3Test(PersistenceAdapterChoice persistenceAdapterChoice) {
+ this.defaultPersistenceAdapter = persistenceAdapterChoice;
+
+ LOG.info(">>>> running {} with persistenceAdapterChoice: {}", testName.getMethodName(), this.defaultPersistenceAdapter);
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testInterleavedOfflineSubscriptionCanConsume() throws Exception {
+ // create durable subscription 1
+ Connection con = createConnection("cliId1");
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ session.close();
+ con.close();
+
+ // send messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(null);
+
+ int sent = 0;
+ for (int i = 0; i < 10; i++) {
+ sent++;
+ Message message = session.createMessage();
+ message.setStringProperty("filter", "true");
+ producer.send(topic, message);
+ }
+
+ Thread.sleep(1 * 1000);
+
+ // create durable subscription 2
+ Connection con2 = createConnection("cliId2");
+ Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ DurableSubscriptionOfflineTestListener listener2 = new DurableSubscriptionOfflineTestListener();
+ consumer2.setMessageListener(listener2);
+
+ assertEquals(0, listener2.count);
+ session2.close();
+ con2.close();
+
+ // send some more
+ for (int i = 0; i < 10; i++) {
+ sent++;
+ Message message = session.createMessage();
+ message.setStringProperty("filter", "true");
+ producer.send(topic, message);
+ }
+
+ Thread.sleep(1 * 1000);
+ session.close();
+ con.close();
+
+ con2 = createConnection("cliId2");
+ session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ listener2 = new DurableSubscriptionOfflineTestListener("cliId2");
+ consumer2.setMessageListener(listener2);
+ // test online subs
+ Thread.sleep(3 * 1000);
+
+ assertEquals(10, listener2.count);
+
+ // consume all messages
+ con = createConnection("cliId1");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener("cliId1");
+ consumer.setMessageListener(listener);
+
+ Thread.sleep(3 * 1000);
+
+ session.close();
+ con.close();
+
+ assertEquals("offline consumer got all", sent, listener.count);
+ }
+
+ private static String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))";
+ @Test(timeout = 60 * 1000)
+ public void testMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
+ // create offline subs 1
+ Connection con = createConnection("offCli1");
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", filter, true);
+ session.close();
+ con.close();
+
+ // create offline subs 2
+ con = createConnection("offCli2");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", filter, true);
+ session.close();
+ con.close();
+
+ // create online subs
+ Connection con2 = createConnection("onlineCli1");
+ Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", filter, true);
+ DurableSubscriptionOfflineTestListener listener2 = new DurableSubscriptionOfflineTestListener();
+ consumer2.setMessageListener(listener2);
+
+ // create non-durable consumer
+ Connection con4 = createConnection("nondurableCli");
+ Session session4 = con4.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer4 = session4.createConsumer(topic, filter, true);
+ DurableSubscriptionOfflineTestListener listener4 = new DurableSubscriptionOfflineTestListener();
+ consumer4.setMessageListener(listener4);
+
+ // send messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(null);
+
+ boolean hasRelevant = false;
+ int filtered = 0;
+ for (int i = 0; i < 100; i++) {
+ int postf = (int) (Math.random() * 9) + 1;
+ String d = "D" + postf;
+
+ if ("D1".equals(d) || "D2".equals(d)) {
+ hasRelevant = true;
+ filtered++;
+ }
+
+ Message message = session.createMessage();
+ message.setStringProperty("$a", "A1");
+ message.setStringProperty("$d", d);
+ producer.send(topic, message);
+ }
+
+ Message message = session.createMessage();
+ message.setStringProperty("$a", "A1");
+ message.setBooleanProperty("$b", true);
+ message.setBooleanProperty("$c", hasRelevant);
+ producer.send(topic, message);
+
+ if (hasRelevant)
+ filtered++;
+
+ Thread.sleep(1 * 1000);
+ session.close();
+ con.close();
+
+ Thread.sleep(3 * 1000);
+
+ // test non-durable consumer
+ session4.close();
+ con4.close();
+ assertEquals(filtered, listener4.count); // succeeded!
+
+ // test online subs
+ session2.close();
+ con2.close();
+ assertEquals(filtered, listener2.count); // succeeded!
+
+ // test offline 1
+ con = createConnection("offCli1");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
+ DurableSubscriptionOfflineTestListener listener = new FilterCheckListener();
+ consumer.setMessageListener(listener);
+
+ Thread.sleep(3 * 1000);
+ session.close();
+ con.close();
+
+ assertEquals(filtered, listener.count);
+
+ // test offline 2
+ Connection con3 = createConnection("offCli2");
+ Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", filter, true);
+ DurableSubscriptionOfflineTestListener listener3 = new FilterCheckListener();
+ consumer3.setMessageListener(listener3);
+
+ Thread.sleep(3 * 1000);
+ session3.close();
+ con3.close();
+
+ assertEquals(filtered, listener3.count);
+ assertTrue("no unexpected exceptions: " + exceptions, exceptions.isEmpty());
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
+
+ if (PersistenceAdapterChoice.LevelDB == defaultPersistenceAdapter) {
+ // https://issues.apache.org/jira/browse/AMQ-4296
+ return;
+ }
+
+ // create offline subs 1
+ Connection con = createConnection("offCli1");
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ session.close();
+ con.close();
+
+ // create offline subs 2
+ con = createConnection("offCli2");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ session.close();
+ con.close();
+
+ // send messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(null);
+
+ int filtered = 0;
+ for (int i = 0; i < 10; i++) {
+ boolean filter = (int) (Math.random() * 2) >= 1;
+ if (filter)
+ filtered++;
+
+ Message message = session.createMessage();
+ message.setStringProperty("filter", filter ? "true" : "false");
+ producer.send(topic, message);
+ }
+
+ LOG.info("sent: " + filtered);
+ Thread.sleep(1 * 1000);
+ session.close();
+ con.close();
+
+ // restart broker
+ Thread.sleep(3 * 1000);
+ broker.stop();
+ createBroker(false /*deleteAllMessages*/);
+
+ // send more messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ producer = session.createProducer(null);
+
+ for (int i = 0; i < 10; i++) {
+ boolean filter = (int) (Math.random() * 2) >= 1;
+ if (filter)
+ filtered++;
+
+ Message message = session.createMessage();
+ message.setStringProperty("filter", filter ? "true" : "false");
+ producer.send(topic, message);
+ }
+
+ LOG.info("after restart, total sent with filter='true': " + filtered);
+ Thread.sleep(1 * 1000);
+ session.close();
+ con.close();
+
+ // test offline subs
+ con = createConnection("offCli1");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener("1>");
+ consumer.setMessageListener(listener);
+
+ Connection con3 = createConnection("offCli2");
+ Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+ DurableSubscriptionOfflineTestListener listener3 = new DurableSubscriptionOfflineTestListener();
+ consumer3.setMessageListener(listener3);
+
+ Thread.sleep(3 * 1000);
+
+ session.close();
+ con.close();
+ session3.close();
+ con3.close();
+
+ assertEquals(filtered, listener.count);
+ assertEquals(filtered, listener3.count);
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testOfflineSubscriptionAfterRestart() throws Exception {
+ // create offline subs 1
+ Connection con = createConnection("offCli1");
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, false);
+ DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
+ consumer.setMessageListener(listener);
+
+ // send messages
+ MessageProducer producer = session.createProducer(null);
+
+ int sent = 0;
+ for (int i = 0; i < 10; i++) {
+ sent++;
+ Message message = session.createMessage();
+ message.setStringProperty("filter", "false");
+ producer.send(topic, message);
+ }
+
+ LOG.info("sent: " + sent);
+ Thread.sleep(5 * 1000);
+ session.close();
+ con.close();
+
+ assertEquals(sent, listener.count);
+
+ // restart broker
+ Thread.sleep(3 * 1000);
+ broker.stop();
+ createBroker(false /*deleteAllMessages*/);
+
+ // send more messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ producer = session.createProducer(null);
+
+ for (int i = 0; i < 10; i++) {
+ sent++;
+ Message message = session.createMessage();
+ message.setStringProperty("filter", "false");
+ producer.send(topic, message);
+ }
+
+ LOG.info("after restart, sent: " + sent);
+ Thread.sleep(1 * 1000);
+ session.close();
+ con.close();
+
+ // test offline subs
+ con = createConnection("offCli1");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+ consumer.setMessageListener(listener);
+
+ Thread.sleep(3 * 1000);
+
+ session.close();
+ con.close();
+
+ assertEquals(sent, listener.count);
+ }
+
+ public class FilterCheckListener extends DurableSubscriptionOfflineTestListener {
+
+ @Override
+ public void onMessage(Message message) {
+ count++;
+
+ try {
+ Object b = message.getObjectProperty("$b");
+ if (b != null) {
+ boolean c = message.getBooleanProperty("$c");
+ assertTrue("", c);
+ } else {
+ String d = message.getStringProperty("$d");
+ assertTrue("", "D1".equals(d) || "D2".equals(d));
+ }
+ }
+ catch (JMSException e) {
+ e.printStackTrace();
+ exceptions.add(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/57f5d49a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline4Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline4Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline4Test.java
new file mode 100644
index 0000000..09c50d0
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline4Test.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+
+@RunWith(value = Parameterized.class)
+public class DurableSubscriptionOffline4Test extends DurableSubscriptionOfflineTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOffline4Test.class);
+
+ @Parameterized.Parameters(name = "keepDurableSubsActive_{0}")
+ public static Collection<Boolean[]> getTestParameters() {
+ Boolean[] f = {Boolean.FALSE};
+ Boolean[] t = {Boolean.TRUE};
+ List<Boolean[]> booleanChoices = new ArrayList<Boolean[]>();
+ booleanChoices.add(f);
+ booleanChoices.add(t);
+
+ return booleanChoices;
+ }
+
+ public DurableSubscriptionOffline4Test(Boolean keepDurableSubsActive) {
+ this.journalMaxFileLength = 64 * 1024;
+ this.keepDurableSubsActive = keepDurableSubsActive.booleanValue();
+
+ LOG.info(">>>> running {} with keepDurableSubsActive: {}, journalMaxFileLength", testName.getMethodName(), this.keepDurableSubsActive, journalMaxFileLength);
+ }
+
+
+ @Test(timeout = 60 * 1000)
+ // https://issues.apache.org/jira/browse/AMQ-3206
+ public void testCleanupDeletedSubAfterRestart() throws Exception {
+ Connection con = createConnection("cli1");
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", null, true);
+ session.close();
+ con.close();
+
+ con = createConnection("cli2");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", null, true);
+ session.close();
+ con.close();
+
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(null);
+
+ final int toSend = 500;
+ final String payload = new byte[40*1024].toString();
+ int sent = 0;
+ for (int i = sent; i < toSend; i++) {
+ Message message = session.createTextMessage(payload);
+ message.setStringProperty("filter", "false");
+ message.setIntProperty("ID", i);
+ producer.send(topic, message);
+ sent++;
+ }
+ con.close();
+ LOG.info("sent: " + sent);
+
+ // kill off cli1
+ con = createConnection("cli1");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.unsubscribe("SubsId");
+
+ destroyBroker();
+ createBroker(false);
+
+ con = createConnection("cli2");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+ final DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
+ consumer.setMessageListener(listener);
+ assertTrue("got all sent", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ LOG.info("Want: " + toSend + ", current: " + listener.count);
+ return listener.count == toSend;
+ }
+ }));
+ session.close();
+ con.close();
+
+ destroyBroker();
+ createBroker(false);
+ final KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+ assertTrue("Should have less than three journal files left but was: " +
+ pa.getStore().getJournal().getFileMap().size(), Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return pa.getStore().getJournal().getFileMap().size() <= 3;
+ }
+ }));
+ }
+}
+