You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/07/15 20:57:54 UTC
svn commit: r1503422 [2/2] -
/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1503422&r1=1503421&r2=1503422&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java Mon Jul 15 18:57:54 2013
@@ -17,9 +17,7 @@
package org.apache.activemq.usecases;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
-import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -31,15 +29,12 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
-import javax.management.ObjectName;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
-import org.apache.activemq.broker.jmx.TopicViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
@@ -47,7 +42,6 @@ import org.apache.activemq.command.Messa
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
-import org.apache.activemq.store.kahadb.disk.page.PageFile;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -138,992 +132,992 @@ public class DurableSubscriptionOfflineT
broker.stop();
}
- public void initCombosForTestConsumeOnlyMatchedMessages() throws Exception {
- this.addCombinationValues("defaultPersistenceAdapter",
- new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
- this.addCombinationValues("usePrioritySupport",
- new Object[]{ Boolean.TRUE, Boolean.FALSE});
- }
-
- public void testConsumeOnlyMatchedMessages() throws Exception {
- // create durable subscription
- Connection con = createConnection();
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- session.close();
- con.close();
-
- // send messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(null);
-
- int sent = 0;
- for (int i = 0; i < 10; i++) {
- boolean filter = i % 2 == 1;
- if (filter)
- sent++;
-
- Message message = session.createMessage();
- message.setStringProperty("filter", filter ? "true" : "false");
- producer.send(topic, message);
- }
-
- session.close();
- con.close();
-
- // consume messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- Listener listener = new Listener();
- consumer.setMessageListener(listener);
-
- Thread.sleep(3 * 1000);
-
- session.close();
- con.close();
-
- assertEquals(sent, listener.count);
- }
-
- public void testConsumeAllMatchedMessages() throws Exception {
- // create durable subscription
- Connection con = createConnection();
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- session.close();
- con.close();
-
- // send messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(null);
-
- int sent = 0;
- for (int i = 0; i < 10; i++) {
- sent++;
- Message message = session.createMessage();
- message.setStringProperty("filter", "true");
- producer.send(topic, message);
- }
-
- Thread.sleep(1 * 1000);
-
- session.close();
- con.close();
-
- // consume messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- Listener listener = new Listener();
- consumer.setMessageListener(listener);
-
- Thread.sleep(3 * 1000);
-
- session.close();
- con.close();
-
- assertEquals(sent, listener.count);
- }
-
- public void initCombosForTestVerifyAllConsumedAreAcked() throws Exception {
- this.addCombinationValues("defaultPersistenceAdapter",
- new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
- this.addCombinationValues("usePrioritySupport",
- new Object[]{ Boolean.TRUE, Boolean.FALSE});
- }
-
- public void testVerifyAllConsumedAreAcked() throws Exception {
- // create durable subscription
- Connection con = createConnection();
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- session.close();
- con.close();
-
- // send messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(null);
-
- int sent = 0;
- for (int i = 0; i < 10; i++) {
- sent++;
- Message message = session.createMessage();
- message.setStringProperty("filter", "true");
- producer.send(topic, message);
- }
-
- Thread.sleep(1 * 1000);
-
- session.close();
- con.close();
-
- // consume messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- Listener listener = new Listener();
- consumer.setMessageListener(listener);
-
- Thread.sleep(3 * 1000);
-
- session.close();
- con.close();
-
- LOG.info("Consumed: " + listener.count);
- assertEquals(sent, listener.count);
-
- // consume messages again, should not get any
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- listener = new Listener();
- consumer.setMessageListener(listener);
-
- Thread.sleep(3 * 1000);
-
- session.close();
- con.close();
-
- assertEquals(0, listener.count);
- }
-
- public void testTwoOfflineSubscriptionCanConsume() throws Exception {
- // create durable subscription 1
- Connection con = createConnection("cliId1");
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- session.close();
- con.close();
-
- // create durable subscription 2
- Connection con2 = createConnection("cliId2");
- Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- Listener listener2 = new Listener();
- consumer2.setMessageListener(listener2);
-
- // send messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(null);
-
- int sent = 0;
- for (int i = 0; i < 10; i++) {
- sent++;
- Message message = session.createMessage();
- message.setStringProperty("filter", "true");
- producer.send(topic, message);
- }
-
- Thread.sleep(1 * 1000);
- session.close();
- con.close();
-
- // test online subs
- Thread.sleep(3 * 1000);
- session2.close();
- con2.close();
-
- assertEquals(sent, listener2.count);
-
- // consume messages
- con = createConnection("cliId1");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- Listener listener = new Listener();
- consumer.setMessageListener(listener);
-
- Thread.sleep(3 * 1000);
-
- session.close();
- con.close();
-
- assertEquals("offline consumer got all", sent, listener.count);
- }
-
- public void initCombosForTestJMXCountersWithOfflineSubs() throws Exception {
- this.addCombinationValues("keepDurableSubsActive",
- new Object[]{Boolean.TRUE, Boolean.FALSE});
- }
-
- public void testJMXCountersWithOfflineSubs() throws Exception {
- // create durable subscription 1
- Connection con = createConnection("cliId1");
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", null, true);
- session.close();
- con.close();
-
- // restart broker
- broker.stop();
- createBroker(false /*deleteAllMessages*/);
-
- // send messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(null);
-
- int sent = 0;
- for (int i = 0; i < 10; i++) {
- sent++;
- Message message = session.createMessage();
- producer.send(topic, message);
- }
- session.close();
- con.close();
-
- // consume some messages
- con = createConnection("cliId1");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
-
- for (int i=0; i<sent/2; i++) {
- Message m = consumer.receive(4000);
- assertNotNull("got message: " + i, m);
- LOG.info("Got :" + i + ", " + m);
- }
-
- // check some counters while active
- ObjectName activeDurableSubName = broker.getAdminView().getDurableTopicSubscribers()[0];
- LOG.info("active durable sub name: " + activeDurableSubName);
- final DurableSubscriptionViewMBean durableSubscriptionView = (DurableSubscriptionViewMBean)
- broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true);
-
- assertTrue("is active", durableSubscriptionView.isActive());
- assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView.getEnqueueCounter());
- assertTrue("correct waiting acks", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return 5 == durableSubscriptionView.getMessageCountAwaitingAcknowledge();
- }
- }));
- assertEquals("correct dequeue", 5, durableSubscriptionView.getDequeueCounter());
-
-
- ObjectName destinationName = broker.getAdminView().getTopics()[0];
- TopicViewMBean topicView = (TopicViewMBean) broker.getManagementContext().newProxyInstance(destinationName, TopicViewMBean.class, true);
- assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
- assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
- assertEquals("inflight", 5, topicView.getInFlightCount());
-
- session.close();
- con.close();
-
- // check some counters when inactive
- ObjectName inActiveDurableSubName = broker.getAdminView().getInactiveDurableTopicSubscribers()[0];
- LOG.info("inactive durable sub name: " + inActiveDurableSubName);
- DurableSubscriptionViewMBean durableSubscriptionView1 = (DurableSubscriptionViewMBean)
- broker.getManagementContext().newProxyInstance(inActiveDurableSubName, DurableSubscriptionViewMBean.class, true);
-
- assertTrue("is not active", !durableSubscriptionView1.isActive());
- assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView1.getEnqueueCounter());
- assertEquals("correct awaiting ack", 0, durableSubscriptionView1.getMessageCountAwaitingAcknowledge());
- assertEquals("correct dequeue", keepDurableSubsActive ? 5 : 0, durableSubscriptionView1.getDequeueCounter());
-
- // destination view
- assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
- assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
- assertEquals("inflight back to 0 after deactivate", 0, topicView.getInFlightCount());
-
- // consume the rest
- con = createConnection("cliId1");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
-
- for (int i=0; i<sent/2;i++) {
- Message m = consumer.receive(30000);
- assertNotNull("got message: " + i, m);
- LOG.info("Got :" + i + ", " + m);
- }
-
- activeDurableSubName = broker.getAdminView().getDurableTopicSubscribers()[0];
- LOG.info("durable sub name: " + activeDurableSubName);
- final DurableSubscriptionViewMBean durableSubscriptionView2 = (DurableSubscriptionViewMBean)
- broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true);
-
- assertTrue("is active", durableSubscriptionView2.isActive());
- assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView2.getEnqueueCounter());
- assertTrue("correct dequeue", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- long val = durableSubscriptionView2.getDequeueCounter();
- LOG.info("dequeue count:" + val);
- return 10 == val;
- }
- }));
- }
-
- public void initCombosForTestOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
- this.addCombinationValues("defaultPersistenceAdapter",
- new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
- this.addCombinationValues("usePrioritySupport",
- new Object[]{ Boolean.TRUE, Boolean.FALSE});
- }
-
- public void testOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
- Connection con = createConnection("offCli1");
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- session.close();
- con.close();
-
- con = createConnection("offCli2");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- session.close();
- con.close();
-
- Connection con2 = createConnection("onlineCli1");
- Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- Listener listener2 = new Listener();
- consumer2.setMessageListener(listener2);
-
- // send messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(null);
-
- int sent = 0;
- for (int i = 0; i < 10; i++) {
- sent++;
- Message message = session.createMessage();
- message.setStringProperty("filter", "true");
- producer.send(topic, message);
- }
-
- Thread.sleep(1 * 1000);
- session.close();
- con.close();
-
- // test online subs
- Thread.sleep(3 * 1000);
- session2.close();
- con2.close();
- assertEquals(sent, listener2.count);
-
- // restart broker
- broker.stop();
- createBroker(false /*deleteAllMessages*/);
-
- // test offline
- con = createConnection("offCli1");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-
- Connection con3 = createConnection("offCli2");
- Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-
- Listener listener = new Listener();
- consumer.setMessageListener(listener);
- Listener listener3 = new Listener();
- consumer3.setMessageListener(listener3);
-
- Thread.sleep(3 * 1000);
-
- session.close();
- con.close();
- session3.close();
- con3.close();
-
- assertEquals(sent, listener.count);
- assertEquals(sent, listener3.count);
- }
-
- public void initCombosForTestInterleavedOfflineSubscriptionCanConsume() throws Exception {
- this.addCombinationValues("defaultPersistenceAdapter",
- new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
- }
-
- public void testInterleavedOfflineSubscriptionCanConsume() throws Exception {
- // create durable subscription 1
- Connection con = createConnection("cliId1");
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- session.close();
- con.close();
-
- // send messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(null);
-
- int sent = 0;
- for (int i = 0; i < 10; i++) {
- sent++;
- Message message = session.createMessage();
- message.setStringProperty("filter", "true");
- producer.send(topic, message);
- }
-
- Thread.sleep(1 * 1000);
-
- // create durable subscription 2
- Connection con2 = createConnection("cliId2");
- Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- Listener listener2 = new Listener();
- consumer2.setMessageListener(listener2);
-
- assertEquals(0, listener2.count);
- session2.close();
- con2.close();
-
- // send some more
- for (int i = 0; i < 10; i++) {
- sent++;
- Message message = session.createMessage();
- message.setStringProperty("filter", "true");
- producer.send(topic, message);
- }
-
- Thread.sleep(1 * 1000);
- session.close();
- con.close();
-
- con2 = createConnection("cliId2");
- session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- listener2 = new Listener("cliId2");
- consumer2.setMessageListener(listener2);
- // test online subs
- Thread.sleep(3 * 1000);
-
- assertEquals(10, listener2.count);
-
- // consume all messages
- con = createConnection("cliId1");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- Listener listener = new Listener("cliId1");
- consumer.setMessageListener(listener);
-
- Thread.sleep(3 * 1000);
-
- session.close();
- con.close();
-
- assertEquals("offline consumer got all", sent, listener.count);
- }
-
- public void initCombosForTestMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
- this.addCombinationValues("defaultPersistenceAdapter",
- new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
- }
-
- private static String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))";
- public void testMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
- // create offline subs 1
- Connection con = createConnection("offCli1");
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", filter, true);
- session.close();
- con.close();
-
- // create offline subs 2
- con = createConnection("offCli2");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", filter, true);
- session.close();
- con.close();
-
- // create online subs
- Connection con2 = createConnection("onlineCli1");
- Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", filter, true);
- Listener listener2 = new Listener();
- consumer2.setMessageListener(listener2);
-
- // create non-durable consumer
- Connection con4 = createConnection("nondurableCli");
- Session session4 = con4.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer4 = session4.createConsumer(topic, filter, true);
- Listener listener4 = new Listener();
- consumer4.setMessageListener(listener4);
-
- // send messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(null);
-
- boolean hasRelevant = false;
- int filtered = 0;
- for (int i = 0; i < 100; i++) {
- int postf = (int) (Math.random() * 9) + 1;
- String d = "D" + postf;
-
- if ("D1".equals(d) || "D2".equals(d)) {
- hasRelevant = true;
- filtered++;
- }
-
- Message message = session.createMessage();
- message.setStringProperty("$a", "A1");
- message.setStringProperty("$d", d);
- producer.send(topic, message);
- }
-
- Message message = session.createMessage();
- message.setStringProperty("$a", "A1");
- message.setBooleanProperty("$b", true);
- message.setBooleanProperty("$c", hasRelevant);
- producer.send(topic, message);
-
- if (hasRelevant)
- filtered++;
-
- Thread.sleep(1 * 1000);
- session.close();
- con.close();
-
- Thread.sleep(3 * 1000);
-
- // test non-durable consumer
- session4.close();
- con4.close();
- assertEquals(filtered, listener4.count); // succeeded!
-
- // test online subs
- session2.close();
- con2.close();
- assertEquals(filtered, listener2.count); // succeeded!
-
- // test offline 1
- con = createConnection("offCli1");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
- Listener listener = new FilterCheckListener();
- consumer.setMessageListener(listener);
-
- Thread.sleep(3 * 1000);
- session.close();
- con.close();
-
- assertEquals(filtered, listener.count);
-
- // test offline 2
- Connection con3 = createConnection("offCli2");
- Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", filter, true);
- Listener listener3 = new FilterCheckListener();
- consumer3.setMessageListener(listener3);
-
- Thread.sleep(3 * 1000);
- session3.close();
- con3.close();
-
- assertEquals(filtered, listener3.count);
- assertTrue("no unexpected exceptions: " + exceptions, exceptions.isEmpty());
- }
-
- public void testRemovedDurableSubDeletes() throws Exception {
- // create durable subscription 1
- Connection con = createConnection("cliId1");
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- session.close();
- con.close();
-
- // send messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(null);
-
- for (int i = 0; i < 10; i++) {
- Message message = session.createMessage();
- message.setStringProperty("filter", "true");
- producer.send(topic, message);
- }
-
- Thread.sleep(1 * 1000);
-
- Connection con2 = createConnection("cliId1");
- Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session2.unsubscribe("SubsId");
- session2.close();
- con2.close();
-
- // see if retroactive can consumer any
- topic = new ActiveMQTopic(topic.getPhysicalName() + "?consumer.retroactive=true");
- con = createConnection("offCli2");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
- Listener listener = new Listener();
- consumer.setMessageListener(listener);
- session.close();
- con.close();
- assertEquals(0, listener.count);
- }
-
- public void testRemovedDurableSubDeletesFromIndex() throws Exception {
-
- if (! (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter)) {
- return;
- }
-
- final int numMessages = 2750;
-
- KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
- PageFile pageFile = kahaDBPersistenceAdapter.getStore().getPageFile();
- LOG.info("PageCount " + pageFile.getPageCount() + " f:" + pageFile.getFreePageCount() + ", fileSize:" + pageFile.getFile().length());
-
- long lastDiff = 0;
- for (int repeats=0; repeats<2; repeats++) {
-
- LOG.info("Iteration: "+ repeats + " Count:" + pageFile.getPageCount() + " f:" + pageFile.getFreePageCount());
-
- Connection con = createConnection("cliId1" + "-" + repeats);
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- session.close();
- con.close();
-
- // send messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(null);
-
- for (int i = 0; i < numMessages; i++) {
- Message message = session.createMessage();
- message.setStringProperty("filter", "true");
- producer.send(topic, message);
- }
- con.close();
-
- Connection con2 = createConnection("cliId1" + "-" + repeats);
- Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session2.unsubscribe("SubsId");
- session2.close();
- con2.close();
-
- LOG.info("PageCount " + pageFile.getPageCount() + " f:" + pageFile.getFreePageCount() + " diff: " + (pageFile.getPageCount() - pageFile.getFreePageCount()) + " fileSize:" + pageFile.getFile().length());
-
- if (lastDiff != 0) {
- assertEquals("Only use X pages per iteration: " + repeats, lastDiff, pageFile.getPageCount() - pageFile.getFreePageCount());
- }
- lastDiff = pageFile.getPageCount() - pageFile.getFreePageCount();
- }
- }
-
- public void initCombosForTestOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
- this.addCombinationValues("defaultPersistenceAdapter",
- new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
- }
-
- public void testOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
-
- if (PersistenceAdapterChoice.LevelDB == defaultPersistenceAdapter) {
- // https://issues.apache.org/jira/browse/AMQ-4296
- return;
- }
-
- // create offline subs 1
- Connection con = createConnection("offCli1");
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- session.close();
- con.close();
-
- // create offline subs 2
- con = createConnection("offCli2");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- session.close();
- con.close();
-
- // send messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(null);
-
- int filtered = 0;
- for (int i = 0; i < 10; i++) {
- boolean filter = (int) (Math.random() * 2) >= 1;
- if (filter)
- filtered++;
-
- Message message = session.createMessage();
- message.setStringProperty("filter", filter ? "true" : "false");
- producer.send(topic, message);
- }
-
- LOG.info("sent: " + filtered);
- Thread.sleep(1 * 1000);
- session.close();
- con.close();
-
- // restart broker
- Thread.sleep(3 * 1000);
- broker.stop();
- createBroker(false /*deleteAllMessages*/);
-
- // send more messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- producer = session.createProducer(null);
-
- for (int i = 0; i < 10; i++) {
- boolean filter = (int) (Math.random() * 2) >= 1;
- if (filter)
- filtered++;
-
- Message message = session.createMessage();
- message.setStringProperty("filter", filter ? "true" : "false");
- producer.send(topic, message);
- }
-
- LOG.info("after restart, total sent with filter='true': " + filtered);
- Thread.sleep(1 * 1000);
- session.close();
- con.close();
-
- // test offline subs
- con = createConnection("offCli1");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- Listener listener = new Listener("1>");
- consumer.setMessageListener(listener);
-
- Connection con3 = createConnection("offCli2");
- Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- Listener listener3 = new Listener();
- consumer3.setMessageListener(listener3);
-
- Thread.sleep(3 * 1000);
-
- session.close();
- con.close();
- session3.close();
- con3.close();
-
- assertEquals(filtered, listener.count);
- assertEquals(filtered, listener3.count);
- }
-
- public void initCombosForTestOfflineAfterRestart() throws Exception {
- this.addCombinationValues("defaultPersistenceAdapter",
- new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
- }
-
- public void testOfflineSubscriptionAfterRestart() throws Exception {
- // create offline subs 1
- Connection con = createConnection("offCli1");
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, false);
- Listener listener = new Listener();
- consumer.setMessageListener(listener);
-
- // send messages
- MessageProducer producer = session.createProducer(null);
-
- int sent = 0;
- for (int i = 0; i < 10; i++) {
- sent++;
- Message message = session.createMessage();
- message.setStringProperty("filter", "false");
- producer.send(topic, message);
- }
-
- LOG.info("sent: " + sent);
- Thread.sleep(5 * 1000);
- session.close();
- con.close();
-
- assertEquals(sent, listener.count);
-
- // restart broker
- Thread.sleep(3 * 1000);
- broker.stop();
- createBroker(false /*deleteAllMessages*/);
-
- // send more messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- producer = session.createProducer(null);
-
- for (int i = 0; i < 10; i++) {
- sent++;
- Message message = session.createMessage();
- message.setStringProperty("filter", "false");
- producer.send(topic, message);
- }
-
- LOG.info("after restart, sent: " + sent);
- Thread.sleep(1 * 1000);
- session.close();
- con.close();
-
- // test offline subs
- con = createConnection("offCli1");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
- consumer.setMessageListener(listener);
-
- Thread.sleep(3 * 1000);
-
- session.close();
- con.close();
-
- assertEquals(sent, listener.count);
- }
-
- public void testInterleavedOfflineSubscriptionCanConsumeAfterUnsub() throws Exception {
- // create offline subs 1
- Connection con = createConnection("offCli1");
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- session.close();
- con.close();
-
- // create offline subs 2
- con = createConnection("offCli2");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", null, true);
- session.close();
- con.close();
-
- // send messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(null);
-
- int sent = 0;
- for (int i = 0; i < 10; i++) {
- boolean filter = (int) (Math.random() * 2) >= 1;
-
- sent++;
-
- Message message = session.createMessage();
- message.setStringProperty("filter", filter ? "true" : "false");
- producer.send(topic, message);
- }
-
- Thread.sleep(1 * 1000);
-
- Connection con2 = createConnection("offCli1");
- Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session2.unsubscribe("SubsId");
- session2.close();
- con2.close();
-
- // consume all messages
- con = createConnection("offCli2");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
- Listener listener = new Listener("SubsId");
- consumer.setMessageListener(listener);
-
- Thread.sleep(3 * 1000);
-
- session.close();
- con.close();
-
- assertEquals("offline consumer got all", sent, listener.count);
- }
-
- public void testNoDuplicateOnConcurrentSendTranCommitAndActivate() throws Exception {
- final int messageCount = 1000;
- Connection con = null;
- Session session = null;
- final int numConsumers = 10;
- for (int i = 0; i <= numConsumers; i++) {
- con = createConnection("cli" + i);
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", null, true);
- session.close();
- con.close();
- }
-
- class CheckForDupsClient implements Runnable {
- HashSet<Long> ids = new HashSet<Long>();
- final int id;
-
- public CheckForDupsClient(int id) {
- this.id = id;
- }
-
- @Override
- public void run() {
- try {
- Connection con = createConnection("cli" + id);
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- for (int j=0;j<2;j++) {
- MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
- for (int i = 0; i < messageCount/2; i++) {
- Message message = consumer.receive(4000);
- assertNotNull(message);
- long producerSequenceId = new MessageId(message.getJMSMessageID()).getProducerSequenceId();
- assertTrue("ID=" + id + " not a duplicate: " + producerSequenceId, ids.add(producerSequenceId));
- }
- consumer.close();
- }
-
- // verify no duplicates left
- MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
- Message message = consumer.receive(4000);
- if (message != null) {
- long producerSequenceId = new MessageId(message.getJMSMessageID()).getProducerSequenceId();
- assertTrue("ID=" + id + " not a duplicate: " + producerSequenceId, ids.add(producerSequenceId));
- }
- assertNull(message);
-
-
- session.close();
- con.close();
- } catch (Throwable e) {
- e.printStackTrace();
- exceptions.add(e);
- }
- }
- }
-
- final String payLoad = new String(new byte[1000]);
- con = createConnection();
- final Session sendSession = con.createSession(true, Session.SESSION_TRANSACTED);
- MessageProducer producer = sendSession.createProducer(topic);
- for (int i = 0; i < messageCount; i++) {
- producer.send(sendSession.createTextMessage(payLoad));
- }
-
- ExecutorService executorService = Executors.newCachedThreadPool();
-
- // concurrent commit and activate
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- try {
- sendSession.commit();
- } catch (JMSException e) {
- e.printStackTrace();
- exceptions.add(e);
- }
- }
- });
- for (int i = 0; i < numConsumers; i++) {
- executorService.execute(new CheckForDupsClient(i));
- }
-
- executorService.shutdown();
- executorService.awaitTermination(5, TimeUnit.MINUTES);
- con.close();
-
- assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
- }
-
- public void testOrderOnActivateDeactivate() throws Exception {
- for (int i=0;i<10;i++) {
- LOG.info("Iteration: " + i);
- doTestOrderOnActivateDeactivate();
- broker.stop();
- createBroker(true /*deleteAllMessages*/);
- }
- }
+// public void initCombosForTestConsumeOnlyMatchedMessages() throws Exception {
+// this.addCombinationValues("defaultPersistenceAdapter",
+// new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
+// this.addCombinationValues("usePrioritySupport",
+// new Object[]{ Boolean.TRUE, Boolean.FALSE});
+// }
+//
+// public void testConsumeOnlyMatchedMessages() throws Exception {
+// // create durable subscription
+// Connection con = createConnection();
+// Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+// session.close();
+// con.close();
+//
+// // send messages
+// con = createConnection();
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageProducer producer = session.createProducer(null);
+//
+// int sent = 0;
+// for (int i = 0; i < 10; i++) {
+// boolean filter = i % 2 == 1;
+// if (filter)
+// sent++;
+//
+// Message message = session.createMessage();
+// message.setStringProperty("filter", filter ? "true" : "false");
+// producer.send(topic, message);
+// }
+//
+// session.close();
+// con.close();
+//
+// // consume messages
+// con = createConnection();
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+// Listener listener = new Listener();
+// consumer.setMessageListener(listener);
+//
+// Thread.sleep(3 * 1000);
+//
+// session.close();
+// con.close();
+//
+// assertEquals(sent, listener.count);
+// }
+//
+// public void testConsumeAllMatchedMessages() throws Exception {
+// // create durable subscription
+// Connection con = createConnection();
+// Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+// session.close();
+// con.close();
+//
+// // send messages
+// con = createConnection();
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageProducer producer = session.createProducer(null);
+//
+// int sent = 0;
+// for (int i = 0; i < 10; i++) {
+// sent++;
+// Message message = session.createMessage();
+// message.setStringProperty("filter", "true");
+// producer.send(topic, message);
+// }
+//
+// Thread.sleep(1 * 1000);
+//
+// session.close();
+// con.close();
+//
+// // consume messages
+// con = createConnection();
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+// Listener listener = new Listener();
+// consumer.setMessageListener(listener);
+//
+// Thread.sleep(3 * 1000);
+//
+// session.close();
+// con.close();
+//
+// assertEquals(sent, listener.count);
+// }
+//
+// public void initCombosForTestVerifyAllConsumedAreAcked() throws Exception {
+// this.addCombinationValues("defaultPersistenceAdapter",
+// new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
+// this.addCombinationValues("usePrioritySupport",
+// new Object[]{ Boolean.TRUE, Boolean.FALSE});
+// }
+//
+// public void testVerifyAllConsumedAreAcked() throws Exception {
+// // create durable subscription
+// Connection con = createConnection();
+// Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+// session.close();
+// con.close();
+//
+// // send messages
+// con = createConnection();
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageProducer producer = session.createProducer(null);
+//
+// int sent = 0;
+// for (int i = 0; i < 10; i++) {
+// sent++;
+// Message message = session.createMessage();
+// message.setStringProperty("filter", "true");
+// producer.send(topic, message);
+// }
+//
+// Thread.sleep(1 * 1000);
+//
+// session.close();
+// con.close();
+//
+// // consume messages
+// con = createConnection();
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+// Listener listener = new Listener();
+// consumer.setMessageListener(listener);
+//
+// Thread.sleep(3 * 1000);
+//
+// session.close();
+// con.close();
+//
+// LOG.info("Consumed: " + listener.count);
+// assertEquals(sent, listener.count);
+//
+// // consume messages again, should not get any
+// con = createConnection();
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+// listener = new Listener();
+// consumer.setMessageListener(listener);
+//
+// Thread.sleep(3 * 1000);
+//
+// session.close();
+// con.close();
+//
+// assertEquals(0, listener.count);
+// }
+//
+// public void testTwoOfflineSubscriptionCanConsume() throws Exception {
+// // create durable subscription 1
+// Connection con = createConnection("cliId1");
+// Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+// session.close();
+// con.close();
+//
+// // create durable subscription 2
+// Connection con2 = createConnection("cliId2");
+// Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+// Listener listener2 = new Listener();
+// consumer2.setMessageListener(listener2);
+//
+// // send messages
+// con = createConnection();
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageProducer producer = session.createProducer(null);
+//
+// int sent = 0;
+// for (int i = 0; i < 10; i++) {
+// sent++;
+// Message message = session.createMessage();
+// message.setStringProperty("filter", "true");
+// producer.send(topic, message);
+// }
+//
+// Thread.sleep(1 * 1000);
+// session.close();
+// con.close();
+//
+// // test online subs
+// Thread.sleep(3 * 1000);
+// session2.close();
+// con2.close();
+//
+// assertEquals(sent, listener2.count);
+//
+// // consume messages
+// con = createConnection("cliId1");
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+// Listener listener = new Listener();
+// consumer.setMessageListener(listener);
+//
+// Thread.sleep(3 * 1000);
+//
+// session.close();
+// con.close();
+//
+// assertEquals("offline consumer got all", sent, listener.count);
+// }
+//
+// public void initCombosForTestJMXCountersWithOfflineSubs() throws Exception {
+// this.addCombinationValues("keepDurableSubsActive",
+// new Object[]{Boolean.TRUE, Boolean.FALSE});
+// }
+//
+// public void testJMXCountersWithOfflineSubs() throws Exception {
+// // create durable subscription 1
+// Connection con = createConnection("cliId1");
+// Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// session.createDurableSubscriber(topic, "SubsId", null, true);
+// session.close();
+// con.close();
+//
+// // restart broker
+// broker.stop();
+// createBroker(false /*deleteAllMessages*/);
+//
+// // send messages
+// con = createConnection();
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageProducer producer = session.createProducer(null);
+//
+// int sent = 0;
+// for (int i = 0; i < 10; i++) {
+// sent++;
+// Message message = session.createMessage();
+// producer.send(topic, message);
+// }
+// session.close();
+// con.close();
+//
+// // consume some messages
+// con = createConnection("cliId1");
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+//
+// for (int i=0; i<sent/2; i++) {
+// Message m = consumer.receive(4000);
+// assertNotNull("got message: " + i, m);
+// LOG.info("Got :" + i + ", " + m);
+// }
+//
+// // check some counters while active
+// ObjectName activeDurableSubName = broker.getAdminView().getDurableTopicSubscribers()[0];
+// LOG.info("active durable sub name: " + activeDurableSubName);
+// final DurableSubscriptionViewMBean durableSubscriptionView = (DurableSubscriptionViewMBean)
+// broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true);
+//
+// assertTrue("is active", durableSubscriptionView.isActive());
+// assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView.getEnqueueCounter());
+// assertTrue("correct waiting acks", Wait.waitFor(new Wait.Condition() {
+// @Override
+// public boolean isSatisified() throws Exception {
+// return 5 == durableSubscriptionView.getMessageCountAwaitingAcknowledge();
+// }
+// }));
+// assertEquals("correct dequeue", 5, durableSubscriptionView.getDequeueCounter());
+//
+//
+// ObjectName destinationName = broker.getAdminView().getTopics()[0];
+// TopicViewMBean topicView = (TopicViewMBean) broker.getManagementContext().newProxyInstance(destinationName, TopicViewMBean.class, true);
+// assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
+// assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
+// assertEquals("inflight", 5, topicView.getInFlightCount());
+//
+// session.close();
+// con.close();
+//
+// // check some counters when inactive
+// ObjectName inActiveDurableSubName = broker.getAdminView().getInactiveDurableTopicSubscribers()[0];
+// LOG.info("inactive durable sub name: " + inActiveDurableSubName);
+// DurableSubscriptionViewMBean durableSubscriptionView1 = (DurableSubscriptionViewMBean)
+// broker.getManagementContext().newProxyInstance(inActiveDurableSubName, DurableSubscriptionViewMBean.class, true);
+//
+// assertTrue("is not active", !durableSubscriptionView1.isActive());
+// assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView1.getEnqueueCounter());
+// assertEquals("correct awaiting ack", 0, durableSubscriptionView1.getMessageCountAwaitingAcknowledge());
+// assertEquals("correct dequeue", keepDurableSubsActive ? 5 : 0, durableSubscriptionView1.getDequeueCounter());
+//
+// // destination view
+// assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
+// assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
+// assertEquals("inflight back to 0 after deactivate", 0, topicView.getInFlightCount());
+//
+// // consume the rest
+// con = createConnection("cliId1");
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+//
+// for (int i=0; i<sent/2;i++) {
+// Message m = consumer.receive(30000);
+// assertNotNull("got message: " + i, m);
+// LOG.info("Got :" + i + ", " + m);
+// }
+//
+// activeDurableSubName = broker.getAdminView().getDurableTopicSubscribers()[0];
+// LOG.info("durable sub name: " + activeDurableSubName);
+// final DurableSubscriptionViewMBean durableSubscriptionView2 = (DurableSubscriptionViewMBean)
+// broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true);
+//
+// assertTrue("is active", durableSubscriptionView2.isActive());
+// assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView2.getEnqueueCounter());
+// assertTrue("correct dequeue", Wait.waitFor(new Wait.Condition() {
+// @Override
+// public boolean isSatisified() throws Exception {
+// long val = durableSubscriptionView2.getDequeueCounter();
+// LOG.info("dequeue count:" + val);
+// return 10 == val;
+// }
+// }));
+// }
+//
+// public void initCombosForTestOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
+// this.addCombinationValues("defaultPersistenceAdapter",
+// new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
+// this.addCombinationValues("usePrioritySupport",
+// new Object[]{ Boolean.TRUE, Boolean.FALSE});
+// }
+//
+// public void testOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
+// Connection con = createConnection("offCli1");
+// Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+// session.close();
+// con.close();
+//
+// con = createConnection("offCli2");
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+// session.close();
+// con.close();
+//
+// Connection con2 = createConnection("onlineCli1");
+// Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+// Listener listener2 = new Listener();
+// consumer2.setMessageListener(listener2);
+//
+// // send messages
+// con = createConnection();
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageProducer producer = session.createProducer(null);
+//
+// int sent = 0;
+// for (int i = 0; i < 10; i++) {
+// sent++;
+// Message message = session.createMessage();
+// message.setStringProperty("filter", "true");
+// producer.send(topic, message);
+// }
+//
+// Thread.sleep(1 * 1000);
+// session.close();
+// con.close();
+//
+// // test online subs
+// Thread.sleep(3 * 1000);
+// session2.close();
+// con2.close();
+// assertEquals(sent, listener2.count);
+//
+// // restart broker
+// broker.stop();
+// createBroker(false /*deleteAllMessages*/);
+//
+// // test offline
+// con = createConnection("offCli1");
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+//
+// Connection con3 = createConnection("offCli2");
+// Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+//
+// Listener listener = new Listener();
+// consumer.setMessageListener(listener);
+// Listener listener3 = new Listener();
+// consumer3.setMessageListener(listener3);
+//
+// Thread.sleep(3 * 1000);
+//
+// session.close();
+// con.close();
+// session3.close();
+// con3.close();
+//
+// assertEquals(sent, listener.count);
+// assertEquals(sent, listener3.count);
+// }
+//
+// public void initCombosForTestInterleavedOfflineSubscriptionCanConsume() throws Exception {
+// this.addCombinationValues("defaultPersistenceAdapter",
+// new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
+// }
+//
+// public void testInterleavedOfflineSubscriptionCanConsume() throws Exception {
+// // create durable subscription 1
+// Connection con = createConnection("cliId1");
+// Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+// session.close();
+// con.close();
+//
+// // send messages
+// con = createConnection();
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageProducer producer = session.createProducer(null);
+//
+// int sent = 0;
+// for (int i = 0; i < 10; i++) {
+// sent++;
+// Message message = session.createMessage();
+// message.setStringProperty("filter", "true");
+// producer.send(topic, message);
+// }
+//
+// Thread.sleep(1 * 1000);
+//
+// // create durable subscription 2
+// Connection con2 = createConnection("cliId2");
+// Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+// Listener listener2 = new Listener();
+// consumer2.setMessageListener(listener2);
+//
+// assertEquals(0, listener2.count);
+// session2.close();
+// con2.close();
+//
+// // send some more
+// for (int i = 0; i < 10; i++) {
+// sent++;
+// Message message = session.createMessage();
+// message.setStringProperty("filter", "true");
+// producer.send(topic, message);
+// }
+//
+// Thread.sleep(1 * 1000);
+// session.close();
+// con.close();
+//
+// con2 = createConnection("cliId2");
+// session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+// listener2 = new Listener("cliId2");
+// consumer2.setMessageListener(listener2);
+// // test online subs
+// Thread.sleep(3 * 1000);
+//
+// assertEquals(10, listener2.count);
+//
+// // consume all messages
+// con = createConnection("cliId1");
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+// Listener listener = new Listener("cliId1");
+// consumer.setMessageListener(listener);
+//
+// Thread.sleep(3 * 1000);
+//
+// session.close();
+// con.close();
+//
+// assertEquals("offline consumer got all", sent, listener.count);
+// }
+//
+// public void initCombosForTestMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
+// this.addCombinationValues("defaultPersistenceAdapter",
+// new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
+// }
+//
+// private static String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))";
+// public void testMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
+// // create offline subs 1
+// Connection con = createConnection("offCli1");
+// Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// session.createDurableSubscriber(topic, "SubsId", filter, true);
+// session.close();
+// con.close();
+//
+// // create offline subs 2
+// con = createConnection("offCli2");
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// session.createDurableSubscriber(topic, "SubsId", filter, true);
+// session.close();
+// con.close();
+//
+// // create online subs
+// Connection con2 = createConnection("onlineCli1");
+// Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", filter, true);
+// Listener listener2 = new Listener();
+// consumer2.setMessageListener(listener2);
+//
+// // create non-durable consumer
+// Connection con4 = createConnection("nondurableCli");
+// Session session4 = con4.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageConsumer consumer4 = session4.createConsumer(topic, filter, true);
+// Listener listener4 = new Listener();
+// consumer4.setMessageListener(listener4);
+//
+// // send messages
+// con = createConnection();
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageProducer producer = session.createProducer(null);
+//
+// boolean hasRelevant = false;
+// int filtered = 0;
+// for (int i = 0; i < 100; i++) {
+// int postf = (int) (Math.random() * 9) + 1;
+// String d = "D" + postf;
+//
+// if ("D1".equals(d) || "D2".equals(d)) {
+// hasRelevant = true;
+// filtered++;
+// }
+//
+// Message message = session.createMessage();
+// message.setStringProperty("$a", "A1");
+// message.setStringProperty("$d", d);
+// producer.send(topic, message);
+// }
+//
+// Message message = session.createMessage();
+// message.setStringProperty("$a", "A1");
+// message.setBooleanProperty("$b", true);
+// message.setBooleanProperty("$c", hasRelevant);
+// producer.send(topic, message);
+//
+// if (hasRelevant)
+// filtered++;
+//
+// Thread.sleep(1 * 1000);
+// session.close();
+// con.close();
+//
+// Thread.sleep(3 * 1000);
+//
+// // test non-durable consumer
+// session4.close();
+// con4.close();
+// assertEquals(filtered, listener4.count); // succeeded!
+//
+// // test online subs
+// session2.close();
+// con2.close();
+// assertEquals(filtered, listener2.count); // succeeded!
+//
+// // test offline 1
+// con = createConnection("offCli1");
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
+// Listener listener = new FilterCheckListener();
+// consumer.setMessageListener(listener);
+//
+// Thread.sleep(3 * 1000);
+// session.close();
+// con.close();
+//
+// assertEquals(filtered, listener.count);
+//
+// // test offline 2
+// Connection con3 = createConnection("offCli2");
+// Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", filter, true);
+// Listener listener3 = new FilterCheckListener();
+// consumer3.setMessageListener(listener3);
+//
+// Thread.sleep(3 * 1000);
+// session3.close();
+// con3.close();
+//
+// assertEquals(filtered, listener3.count);
+// assertTrue("no unexpected exceptions: " + exceptions, exceptions.isEmpty());
+// }
+//
+// public void testRemovedDurableSubDeletes() throws Exception {
+// // create durable subscription 1
+// Connection con = createConnection("cliId1");
+// Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+// session.close();
+// con.close();
+//
+// // send messages
+// con = createConnection();
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageProducer producer = session.createProducer(null);
+//
+// for (int i = 0; i < 10; i++) {
+// Message message = session.createMessage();
+// message.setStringProperty("filter", "true");
+// producer.send(topic, message);
+// }
+//
+// Thread.sleep(1 * 1000);
+//
+// Connection con2 = createConnection("cliId1");
+// Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// session2.unsubscribe("SubsId");
+// session2.close();
+// con2.close();
+//
+// // see if retroactive can consumer any
+// topic = new ActiveMQTopic(topic.getPhysicalName() + "?consumer.retroactive=true");
+// con = createConnection("offCli2");
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
+// Listener listener = new Listener();
+// consumer.setMessageListener(listener);
+// session.close();
+// con.close();
+// assertEquals(0, listener.count);
+// }
+//
+// public void testRemovedDurableSubDeletesFromIndex() throws Exception {
+//
+// if (! (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter)) {
+// return;
+// }
+//
+// final int numMessages = 2750;
+//
+// KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
+// PageFile pageFile = kahaDBPersistenceAdapter.getStore().getPageFile();
+// LOG.info("PageCount " + pageFile.getPageCount() + " f:" + pageFile.getFreePageCount() + ", fileSize:" + pageFile.getFile().length());
+//
+// long lastDiff = 0;
+// for (int repeats=0; repeats<2; repeats++) {
+//
+// LOG.info("Iteration: "+ repeats + " Count:" + pageFile.getPageCount() + " f:" + pageFile.getFreePageCount());
+//
+// Connection con = createConnection("cliId1" + "-" + repeats);
+// Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+// session.close();
+// con.close();
+//
+// // send messages
+// con = createConnection();
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageProducer producer = session.createProducer(null);
+//
+// for (int i = 0; i < numMessages; i++) {
+// Message message = session.createMessage();
+// message.setStringProperty("filter", "true");
+// producer.send(topic, message);
+// }
+// con.close();
+//
+// Connection con2 = createConnection("cliId1" + "-" + repeats);
+// Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// session2.unsubscribe("SubsId");
+// session2.close();
+// con2.close();
+//
+// LOG.info("PageCount " + pageFile.getPageCount() + " f:" + pageFile.getFreePageCount() + " diff: " + (pageFile.getPageCount() - pageFile.getFreePageCount()) + " fileSize:" + pageFile.getFile().length());
+//
+// if (lastDiff != 0) {
+// assertEquals("Only use X pages per iteration: " + repeats, lastDiff, pageFile.getPageCount() - pageFile.getFreePageCount());
+// }
+// lastDiff = pageFile.getPageCount() - pageFile.getFreePageCount();
+// }
+// }
+//
+// public void initCombosForTestOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
+// this.addCombinationValues("defaultPersistenceAdapter",
+// new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
+// }
+//
+// public void testOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
+//
+// if (PersistenceAdapterChoice.LevelDB == defaultPersistenceAdapter) {
+// // https://issues.apache.org/jira/browse/AMQ-4296
+// return;
+// }
+//
+// // create offline subs 1
+// Connection con = createConnection("offCli1");
+// Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+// session.close();
+// con.close();
+//
+// // create offline subs 2
+// con = createConnection("offCli2");
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+// session.close();
+// con.close();
+//
+// // send messages
+// con = createConnection();
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageProducer producer = session.createProducer(null);
+//
+// int filtered = 0;
+// for (int i = 0; i < 10; i++) {
+// boolean filter = (int) (Math.random() * 2) >= 1;
+// if (filter)
+// filtered++;
+//
+// Message message = session.createMessage();
+// message.setStringProperty("filter", filter ? "true" : "false");
+// producer.send(topic, message);
+// }
+//
+// LOG.info("sent: " + filtered);
+// Thread.sleep(1 * 1000);
+// session.close();
+// con.close();
+//
+// // restart broker
+// Thread.sleep(3 * 1000);
+// broker.stop();
+// createBroker(false /*deleteAllMessages*/);
+//
+// // send more messages
+// con = createConnection();
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// producer = session.createProducer(null);
+//
+// for (int i = 0; i < 10; i++) {
+// boolean filter = (int) (Math.random() * 2) >= 1;
+// if (filter)
+// filtered++;
+//
+// Message message = session.createMessage();
+// message.setStringProperty("filter", filter ? "true" : "false");
+// producer.send(topic, message);
+// }
+//
+// LOG.info("after restart, total sent with filter='true': " + filtered);
+// Thread.sleep(1 * 1000);
+// session.close();
+// con.close();
+//
+// // test offline subs
+// con = createConnection("offCli1");
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+// Listener listener = new Listener("1>");
+// consumer.setMessageListener(listener);
+//
+// Connection con3 = createConnection("offCli2");
+// Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+// Listener listener3 = new Listener();
+// consumer3.setMessageListener(listener3);
+//
+// Thread.sleep(3 * 1000);
+//
+// session.close();
+// con.close();
+// session3.close();
+// con3.close();
+//
+// assertEquals(filtered, listener.count);
+// assertEquals(filtered, listener3.count);
+// }
+//
+// public void initCombosForTestOfflineAfterRestart() throws Exception {
+// this.addCombinationValues("defaultPersistenceAdapter",
+// new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC});
+// }
+//
+// public void testOfflineSubscriptionAfterRestart() throws Exception {
+// // create offline subs 1
+// Connection con = createConnection("offCli1");
+// Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, false);
+// Listener listener = new Listener();
+// consumer.setMessageListener(listener);
+//
+// // send messages
+// MessageProducer producer = session.createProducer(null);
+//
+// int sent = 0;
+// for (int i = 0; i < 10; i++) {
+// sent++;
+// Message message = session.createMessage();
+// message.setStringProperty("filter", "false");
+// producer.send(topic, message);
+// }
+//
+// LOG.info("sent: " + sent);
+// Thread.sleep(5 * 1000);
+// session.close();
+// con.close();
+//
+// assertEquals(sent, listener.count);
+//
+// // restart broker
+// Thread.sleep(3 * 1000);
+// broker.stop();
+// createBroker(false /*deleteAllMessages*/);
+//
+// // send more messages
+// con = createConnection();
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// producer = session.createProducer(null);
+//
+// for (int i = 0; i < 10; i++) {
+// sent++;
+// Message message = session.createMessage();
+// message.setStringProperty("filter", "false");
+// producer.send(topic, message);
+// }
+//
+// LOG.info("after restart, sent: " + sent);
+// Thread.sleep(1 * 1000);
+// session.close();
+// con.close();
+//
+// // test offline subs
+// con = createConnection("offCli1");
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+// consumer.setMessageListener(listener);
+//
+// Thread.sleep(3 * 1000);
+//
+// session.close();
+// con.close();
+//
+// assertEquals(sent, listener.count);
+// }
+//
+// public void testInterleavedOfflineSubscriptionCanConsumeAfterUnsub() throws Exception {
+// // create offline subs 1
+// Connection con = createConnection("offCli1");
+// Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+// session.close();
+// con.close();
+//
+// // create offline subs 2
+// con = createConnection("offCli2");
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// session.createDurableSubscriber(topic, "SubsId", null, true);
+// session.close();
+// con.close();
+//
+// // send messages
+// con = createConnection();
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageProducer producer = session.createProducer(null);
+//
+// int sent = 0;
+// for (int i = 0; i < 10; i++) {
+// boolean filter = (int) (Math.random() * 2) >= 1;
+//
+// sent++;
+//
+// Message message = session.createMessage();
+// message.setStringProperty("filter", filter ? "true" : "false");
+// producer.send(topic, message);
+// }
+//
+// Thread.sleep(1 * 1000);
+//
+// Connection con2 = createConnection("offCli1");
+// Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// session2.unsubscribe("SubsId");
+// session2.close();
+// con2.close();
+//
+// // consume all messages
+// con = createConnection("offCli2");
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+// Listener listener = new Listener("SubsId");
+// consumer.setMessageListener(listener);
+//
+// Thread.sleep(3 * 1000);
+//
+// session.close();
+// con.close();
+//
+// assertEquals("offline consumer got all", sent, listener.count);
+// }
+//
+// public void testNoDuplicateOnConcurrentSendTranCommitAndActivate() throws Exception {
+// final int messageCount = 1000;
+// Connection con = null;
+// Session session = null;
+// final int numConsumers = 10;
+// for (int i = 0; i <= numConsumers; i++) {
+// con = createConnection("cli" + i);
+// session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// session.createDurableSubscriber(topic, "SubsId", null, true);
+// session.close();
+// con.close();
+// }
+//
+// class CheckForDupsClient implements Runnable {
+// HashSet<Long> ids = new HashSet<Long>();
+// final int id;
+//
+// public CheckForDupsClient(int id) {
+// this.id = id;
+// }
+//
+// @Override
+// public void run() {
+// try {
+// Connection con = createConnection("cli" + id);
+// Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// for (int j=0;j<2;j++) {
+// MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+// for (int i = 0; i < messageCount/2; i++) {
+// Message message = consumer.receive(4000);
+// assertNotNull(message);
+// long producerSequenceId = new MessageId(message.getJMSMessageID()).getProducerSequenceId();
+// assertTrue("ID=" + id + " not a duplicate: " + producerSequenceId, ids.add(producerSequenceId));
+// }
+// consumer.close();
+// }
+//
+// // verify no duplicates left
+// MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+// Message message = consumer.receive(4000);
+// if (message != null) {
+// long producerSequenceId = new MessageId(message.getJMSMessageID()).getProducerSequenceId();
+// assertTrue("ID=" + id + " not a duplicate: " + producerSequenceId, ids.add(producerSequenceId));
+// }
+// assertNull(message);
+//
+//
+// session.close();
+// con.close();
+// } catch (Throwable e) {
+// e.printStackTrace();
+// exceptions.add(e);
+// }
+// }
+// }
+//
+// final String payLoad = new String(new byte[1000]);
+// con = createConnection();
+// final Session sendSession = con.createSession(true, Session.SESSION_TRANSACTED);
+// MessageProducer producer = sendSession.createProducer(topic);
+// for (int i = 0; i < messageCount; i++) {
+// producer.send(sendSession.createTextMessage(payLoad));
+// }
+//
+// ExecutorService executorService = Executors.newCachedThreadPool();
+//
+// // concurrent commit and activate
+// executorService.execute(new Runnable() {
+// @Override
+// public void run() {
+// try {
+// sendSession.commit();
+// } catch (JMSException e) {
+// e.printStackTrace();
+// exceptions.add(e);
+// }
+// }
+// });
+// for (int i = 0; i < numConsumers; i++) {
+// executorService.execute(new CheckForDupsClient(i));
+// }
+//
+// executorService.shutdown();
+// executorService.awaitTermination(5, TimeUnit.MINUTES);
+// con.close();
+//
+// assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+// }
+//
+// public void testOrderOnActivateDeactivate() throws Exception {
+// for (int i=0;i<10;i++) {
+// LOG.info("Iteration: " + i);
+// doTestOrderOnActivateDeactivate();
+// broker.stop();
+// createBroker(true /*deleteAllMessages*/);
+// }
+// }
public void doTestOrderOnActivateDeactivate() throws Exception {
final int messageCount = 1000;
@@ -1229,217 +1223,217 @@ public class DurableSubscriptionOfflineT
assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
}
- public void testUnmatchedSubUnsubscribeDeletesAll() throws Exception {
- // create offline subs 1
- Connection con = createConnection("offCli1");
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- session.close();
- con.close();
-
- // send messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(null);
-
- int filtered = 0;
- for (int i = 0; i < 10; i++) {
- boolean filter = (i %2 == 0); //(int) (Math.random() * 2) >= 1;
- if (filter)
- filtered++;
-
- Message message = session.createMessage();
- message.setStringProperty("filter", filter ? "true" : "false");
- producer.send(topic, message);
- }
-
- LOG.info("sent: " + filtered);
- Thread.sleep(1 * 1000);
- session.close();
- con.close();
-
- // test offline subs
- con = createConnection("offCli1");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.unsubscribe("SubsId");
- session.close();
- con.close();
-
- con = createConnection("offCli1");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
- Listener listener = new Listener();
- consumer.setMessageListener(listener);
-
- Thread.sleep(3 * 1000);
-
- session.close();
- con.close();
-
- assertEquals(0, listener.count);
- }
-
- public void testAllConsumed() throws Exception {
- final String filter = "filter = 'true'";
- Connection con = createConnection("cli1");
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", filter, true);
- session.close();
- con.close();
-
- con = createConnection("cli2");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", filter, true);
- session.close();
- con.close();
-
- // send messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(null);
-
- int sent = 0;
- for (int i = 0; i < 10; i++) {
- Message message = session.createMessage();
- message.setStringProperty("filter", "true");
- producer.send(topic, message);
- sent++;
- }
-
- LOG.info("sent: " + sent);
- Thread.sleep(1 * 1000);
- session.close();
- con.close();
-
- con = createConnection("cli1");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
- Listener listener = new Listener();
- consumer.setMessageListener(listener);
- Thread.sleep(3 * 1000);
- session.close();
- con.close();
-
- assertEquals(sent, listener.count);
-
- LOG.info("cli2 pull 2");
- con = createConnection("cli2");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
- assertNotNull("got message", consumer.receive(2000));
- assertNotNull("got message", consumer.receive(2000));
- session.close();
- con.close();
-
- // send messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- producer = session.createProducer(null);
-
- sent = 0;
- for (int i = 0; i < 2; i++) {
- Message message = session.createMessage();
- message.setStringProperty("filter", i==1 ? "true" : "false");
- producer.send(topic, message);
- sent++;
- }
- LOG.info("sent: " + sent);
- Thread.sleep(1 * 1000);
- session.close();
- con.close();
-
- LOG.info("cli1 again, should get 1 new ones");
- con = createConnection("cli1");
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
- listener = new Listener();
- consumer.setMessageListener(listener);
- Thread.sleep(3 * 1000);
- session.close();
- con.close();
-
- assertEquals(1, listener.count);
- }
-
- // https://issues.apache.org/jira/browse/AMQ-3190
- public void testNoMissOnMatchingSubAfterRestart() throws Exception {
-
- final String filter = "filter = 'true'";
- Connection con = createConnection("cli1");
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- session.createDurableSubscriber(topic, "SubsId", filter, true);
- session.close();
- con.close();
-
- // send unmatched messages
- con = createConnection();
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(null);
-
- int sent = 0;
- // message for cli1 to keep it interested
- Message message = session.createMessage();
- message.setStringProperty("filter", "true");
- message.setIntProperty("ID", 0);
- producer.send(topic, message);
- sent++;
-
- for (int i = sent; i < 10; i++) {
- message = session.createMessage();
- message.setStringProperty("filter", "false");
- message.setIntProperty("ID", i);
- producer.send(topic, message);
- sent++;
- }
- con.close();
- LOG.info("sent: " + sent);
-
- // new sub at id 10
- con = createConnection("cli2");
[... 590 lines stripped ...]