You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/18 14:42:29 UTC
svn commit: r786040 [3/6] - in /activemq/sandbox/activemq-flow:
activemq-all/src/test/java/org/apache/activemq/legacy/
activemq-all/src/test/java/org/apache/activemq/legacy/broker/
activemq-all/src/test/java/org/apache/activemq/legacy/broker/advisory/ ...
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsTestSupport.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsTestSupport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsTestSupport.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test1;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.apollo.CombinationTestSupport;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.legacy.broker.BrokerFactory;
+import org.apache.activemq.legacy.broker.BrokerService;
+
+/**
+ * Test cases used to test the JMS message consumer.
+ *
+ * @version $Revision$
+ */
+public class JmsTestSupport extends CombinationTestSupport {
+
+ static final private AtomicLong TEST_COUNTER = new AtomicLong();
+ public String userName;
+ public String password;
+
+ protected ConnectionFactory factory;
+ protected ActiveMQConnection connection;
+ protected BrokerService broker;
+
+ protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>());
+
+ // /////////////////////////////////////////////////////////////////
+ //
+ // Test support methods.
+ //
+ // /////////////////////////////////////////////////////////////////
+ protected ActiveMQDestination createDestination(Session session, byte type) throws JMSException {
+ String testMethod = getName();
+ if( testMethod.indexOf(" ")>0 ) {
+ testMethod = testMethod.substring(0, testMethod.indexOf(" "));
+ }
+ String name = "TEST." + getClass().getName() + "." +testMethod+"."+TEST_COUNTER.getAndIncrement();
+ switch (type) {
+ case ActiveMQDestination.QUEUE_TYPE:
+ return (ActiveMQDestination)session.createQueue(name);
+ case ActiveMQDestination.TOPIC_TYPE:
+ return (ActiveMQDestination)session.createTopic(name);
+ case ActiveMQDestination.TEMP_QUEUE_TYPE:
+ return (ActiveMQDestination)session.createTemporaryQueue();
+ case ActiveMQDestination.TEMP_TOPIC_TYPE:
+ return (ActiveMQDestination)session.createTemporaryTopic();
+ default:
+ throw new IllegalArgumentException("type: " + type);
+ }
+ }
+
+ protected void sendMessages(Destination destination, int count) throws Exception {
+ ConnectionFactory factory = createConnectionFactory();
+ Connection connection = factory.createConnection();
+ connection.start();
+ sendMessages(connection, destination, count);
+ connection.close();
+ }
+
+ protected void sendMessages(Connection connection, Destination destination, int count) throws JMSException {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ sendMessages(session, destination, count);
+ session.close();
+ }
+
+ protected void sendMessages(Session session, Destination destination, int count) throws JMSException {
+ MessageProducer producer = session.createProducer(destination);
+ for (int i = 0; i < count; i++) {
+ producer.send(session.createTextMessage("" + i));
+ }
+ producer.close();
+ }
+
+ protected ConnectionFactory createConnectionFactory() throws Exception {
+ return new ActiveMQConnectionFactory("vm://localhost");
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ return BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false"));
+ }
+
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ if (System.getProperty("basedir") == null) {
+ File file = new File(".");
+ System.setProperty("basedir", file.getAbsolutePath());
+ }
+
+ broker = createBroker();
+ broker.start();
+ factory = createConnectionFactory();
+ connection = (ActiveMQConnection)factory.createConnection(userName, password);
+ connections.add(connection);
+ }
+
+ protected void tearDown() throws Exception {
+ for (Iterator iter = connections.iterator(); iter.hasNext();) {
+ Connection conn = (Connection)iter.next();
+ try {
+ conn.close();
+ } catch (Throwable e) {
+ }
+ iter.remove();
+ }
+ broker.stop();
+ super.tearDown();
+ }
+
+ protected void safeClose(Connection c) {
+ try {
+ c.close();
+ } catch (Throwable e) {
+ }
+ }
+
+ protected void safeClose(Session s) {
+ try {
+ s.close();
+ } catch (Throwable e) {
+ }
+ }
+
+ protected void safeClose(MessageConsumer c) {
+ try {
+ c.close();
+ } catch (Throwable e) {
+ }
+ }
+
+ protected void safeClose(MessageProducer p) {
+ try {
+ p.close();
+ } catch (Throwable e) {
+ }
+ }
+
+ protected void profilerPause(String prompt) throws IOException {
+ if (System.getProperty("profiler") != null) {
+ pause(prompt);
+ }
+ }
+
+ protected void pause(String prompt) throws IOException {
+ System.out.println();
+ System.out.println(prompt + "> Press enter to continue: ");
+ while (System.in.read() != '\n') {
+ }
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsTestSupport.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsTestSupport.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/LoadTestBurnIn.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/LoadTestBurnIn.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/LoadTestBurnIn.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/LoadTestBurnIn.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test1;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import junit.framework.Test;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.legacy.broker.BrokerFactory;
+import org.apache.activemq.legacy.broker.BrokerService;
+import org.apache.activemq.legacy.broker.TransportConnector;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Small burn test moves sends a moderate amount of messages through the broker,
+ * to checking to make sure that the broker does not lock up after a while of
+ * sustained messaging.
+ *
+ * @version $Revision$
+ */
+public class LoadTestBurnIn extends JmsTestSupport {
+ private static final transient Log LOG = LogFactory.getLog(LoadTestBurnIn.class);
+
+ public ActiveMQDestination destination;
+ public int deliveryMode;
+ public byte destinationType;
+ public boolean durableConsumer;
+ public int messageCount = 50000;
+ public int messageSize = 1024;
+
+ public static Test suite() {
+ return suite(LoadTestBurnIn.class);
+ }
+
+ protected void setUp() throws Exception {
+ LOG.info("Start: " + getName());
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception {
+ try {
+ super.tearDown();
+ } catch (Throwable e) {
+ e.printStackTrace(System.out);
+ } finally {
+ LOG.info("End: " + getName());
+ }
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ return BrokerFactory.createBroker(new URI("broker://(tcp://localhost:0)?useJmx=true"));
+ // return BrokerFactory.createBroker(new
+ // URI("xbean:org/apache/activemq/broker/store/loadtester.xml"));
+ }
+
+ protected ConnectionFactory createConnectionFactory() throws URISyntaxException, IOException {
+ return new ActiveMQConnectionFactory(((TransportConnector)broker.getTransportConnectors().get(0))
+ .getServer().getConnectURI());
+ }
+
+ public void initCombosForTestSendReceive() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+ Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
+ addCombinationValues("durableConsumer", new Object[] {Boolean.TRUE});
+ addCombinationValues("messageSize", new Object[] {Integer.valueOf(101), Integer.valueOf(102),
+ Integer.valueOf(103), Integer.valueOf(104),
+ Integer.valueOf(105), Integer.valueOf(106),
+ Integer.valueOf(107), Integer.valueOf(108)});
+ }
+
+ public void testSendReceive() throws Exception {
+
+ // Durable consumer combination is only valid with topics
+ if (durableConsumer && destinationType != ActiveMQDestination.TOPIC_TYPE) {
+ return;
+ }
+
+ connection.setClientID(getName());
+ connection.getPrefetchPolicy().setAll(1000);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer;
+ if (durableConsumer) {
+ consumer = session.createDurableSubscriber((Topic)destination, "sub1:"
+ + System.currentTimeMillis());
+ } else {
+ consumer = session.createConsumer(destination);
+ }
+ profilerPause("Ready: ");
+
+ final CountDownLatch producerDoneLatch = new CountDownLatch(1);
+
+ // Send the messages, async
+ new Thread() {
+ public void run() {
+ Connection connection2 = null;
+ try {
+ connection2 = factory.createConnection();
+ Session session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(destination);
+ producer.setDeliveryMode(deliveryMode);
+ for (int i = 0; i < messageCount; i++) {
+ BytesMessage m = session.createBytesMessage();
+ m.writeBytes(new byte[messageSize]);
+ producer.send(m);
+ }
+ producer.close();
+ } catch (JMSException e) {
+ e.printStackTrace();
+ } finally {
+ safeClose(connection2);
+ producerDoneLatch.countDown();
+ }
+
+ }
+ }.start();
+
+ // Make sure all the messages were delivered.
+ Message message = null;
+ for (int i = 0; i < messageCount; i++) {
+ message = consumer.receive(5000);
+ assertNotNull("Did not get message: " + i, message);
+ }
+
+ profilerPause("Done: ");
+
+ assertNull(consumer.receiveNoWait());
+ message.acknowledge();
+
+ // Make sure the producer thread finishes.
+ assertTrue(producerDoneLatch.await(5, TimeUnit.SECONDS));
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/LoadTestBurnIn.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/MessageGroupDelayedTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/MessageGroupDelayedTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/MessageGroupDelayedTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/MessageGroupDelayedTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test1;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Test;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.legacy.broker.BrokerService;
+import org.apache.activemq.legacy.broker.TransportConnector;
+import org.apache.activemq.legacy.broker.region.policy.PolicyEntry;
+import org.apache.activemq.legacy.broker.region.policy.PolicyMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+
+public class MessageGroupDelayedTest extends JmsTestSupport {
+ public static final Log log = LogFactory.getLog(MessageGroupDelayedTest.class);
+ protected Connection connection;
+ protected Session session;
+ protected MessageProducer producer;
+ protected Destination destination;
+
+ public int consumersBeforeDispatchStarts;
+ public int timeBeforeDispatchStarts;
+
+ BrokerService broker;
+ protected TransportConnector connector;
+
+ protected HashMap<String, Integer> messageCount = new HashMap<String, Integer>();
+ protected HashMap<String, Set<String>> messageGroups = new HashMap<String, Set<String>>();
+
+ public static Test suite() {
+ return suite(MessageGroupDelayedTest.class);
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
+
+ public void setUp() throws Exception {
+ broker = createBroker();
+ broker.start();
+ ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(connector.getConnectUri() + "?jms.prefetchPolicy.all=1");
+ connection = connFactory.createConnection();
+ session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ destination = new ActiveMQQueue("test-queue2");
+ producer = session.createProducer(destination);
+ connection.start();
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService service = new BrokerService();
+ service.setPersistent(false);
+ service.setUseJmx(false);
+
+ // Setup a destination policy where it takes only 1 message at a time.
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry policy = new PolicyEntry();
+ log.info("testing with consumersBeforeDispatchStarts=" + consumersBeforeDispatchStarts + " and timeBeforeDispatchStarts=" + timeBeforeDispatchStarts);
+ policy.setConsumersBeforeDispatchStarts(consumersBeforeDispatchStarts);
+ policy.setTimeBeforeDispatchStarts(timeBeforeDispatchStarts);
+ policyMap.setDefaultEntry(policy);
+ service.setDestinationPolicy(policyMap);
+
+ connector = service.addConnector("tcp://localhost:0");
+ return service;
+ }
+
+ public void tearDown() throws Exception {
+ producer.close();
+ session.close();
+ connection.close();
+ }
+
+
+
+ public void initCombosForTestDelayedDirectConnectionListener() {
+ addCombinationValues("consumersBeforeDispatchStarts", new Object[] {0, 3, 5});
+ addCombinationValues("timeBeforeDispatchStarts", new Object[] {0, 100});
+ }
+
+ public void testDelayedDirectConnectionListener() throws Exception {
+
+ for(int i = 0; i < 10; i++) {
+ Message msga = session.createTextMessage("hello a");
+ msga.setStringProperty("JMSXGroupID", "A");
+ producer.send(msga);
+ Message msgb = session.createTextMessage("hello b");
+ msgb.setStringProperty("JMSXGroupID", "B");
+ producer.send(msgb);
+ Message msgc = session.createTextMessage("hello c");
+ msgc.setStringProperty("JMSXGroupID", "C");
+ producer.send(msgc);
+ }
+ log.info("30 messages sent to group A/B/C");
+
+ int[] counters = {10, 10, 10};
+
+ CountDownLatch startSignal = new CountDownLatch(1);
+ CountDownLatch doneSignal = new CountDownLatch(1);
+
+ messageCount.put("worker1", 0);
+ messageGroups.put("worker1", new HashSet<String>());
+ Worker worker1 = new Worker(connection, destination, "worker1", startSignal, doneSignal, counters, messageCount, messageGroups);
+ messageCount.put("worker2", 0);
+ messageGroups.put("worker2", new HashSet<String>());
+ Worker worker2 = new Worker(connection, destination, "worker2", startSignal, doneSignal, counters, messageCount, messageGroups);
+ messageCount.put("worker3", 0);
+ messageGroups.put("worker3", new HashSet<String>());
+ Worker worker3 = new Worker(connection, destination, "worker3", startSignal, doneSignal, counters, messageCount, messageGroups);
+
+
+ new Thread(worker1).start();
+ new Thread(worker2).start();
+ new Thread(worker3).start();
+
+ startSignal.countDown();
+ doneSignal.await();
+
+ // check results
+ if (consumersBeforeDispatchStarts == 0 && timeBeforeDispatchStarts == 0) {
+ log.info("Ignoring results because both parameters are 0");
+ return;
+ }
+
+ for (String worker: messageCount.keySet()) {
+ log.info("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker));
+ assertEquals("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker)
+ , 10, messageCount.get(worker).intValue());
+ assertEquals("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker)
+ , 1, messageGroups.get(worker).size());
+ }
+
+ }
+
+ private static final class Worker implements Runnable {
+ private Connection connection = null;
+ private Destination queueName = null;
+ private String workerName = null;
+ private CountDownLatch startSignal = null;
+ private CountDownLatch doneSignal = null;
+ private int[] counters = null;
+ private HashMap<String, Integer> messageCount;
+ private HashMap<String, Set<String>>messageGroups;
+
+
+ private Worker(Connection connection, Destination queueName, String workerName, CountDownLatch startSignal, CountDownLatch doneSignal, int[] counters, HashMap<String, Integer> messageCount, HashMap<String, Set<String>>messageGroups) {
+ this.connection = connection;
+ this.queueName = queueName;
+ this.workerName = workerName;
+ this.startSignal = startSignal;
+ this.doneSignal = doneSignal;
+ this.counters = counters;
+ this.messageCount = messageCount;
+ this.messageGroups = messageGroups;
+ }
+
+ private void update(String group) {
+ int msgCount = messageCount.get(workerName);
+ messageCount.put(workerName, msgCount + 1);
+ Set<String> groups = messageGroups.get(workerName);
+ groups.add(group);
+ messageGroups.put(workerName, groups);
+ }
+
+ public void run() {
+
+ try {
+ log.info(workerName);
+ startSignal.await();
+ Session sess = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = sess.createConsumer(queueName);
+
+ while(true) {
+ if(counters[0] == 0 && counters[1] == 0 && counters[2] == 0 ) {
+ doneSignal.countDown();
+ log.info(workerName + " done...");
+ break;
+ }
+
+ Message msg = consumer.receive(500);
+ if(msg == null)
+ continue;
+
+ String group = msg.getStringProperty("JMSXGroupID");
+ boolean first = msg.getBooleanProperty("JMSXGroupFirstForConsumer");
+
+ if("A".equals(group)){
+ --counters[0];
+ update(group);
+ Thread.sleep(500);
+ }
+ else if("B".equals(group)) {
+ --counters[1];
+ update(group);
+ Thread.sleep(100);
+ }
+ else if("C".equals(group)) {
+ --counters[2];
+ update(group);
+ Thread.sleep(10);
+ }
+ else {
+ log.warn("unknown group");
+ }
+ if (counters[0] != 0 || counters[1] != 0 || counters[2] != 0 ) {
+ msg.acknowledge();
+ }
+ }
+ consumer.close();
+ sess.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/MessageGroupTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/MessageGroupTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/MessageGroupTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/MessageGroupTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,164 @@
+package org.apache.activemq.legacy.test1;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.apollo.CombinationTestSupport;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class MessageGroupTest extends JmsTestSupport {
+
+ private static final Log LOG = LogFactory.getLog(CombinationTestSupport.class);
+
+ public void testGroupedMessagesDeliveredToOnlyOneConsumer() throws Exception {
+
+ ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+ // Setup a first connection
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer1 = session.createConsumer(destination);
+ MessageProducer producer = session.createProducer(destination);
+
+ // Send the messages.
+ for (int i = 0; i < 4; i++) {
+ TextMessage message = session.createTextMessage("message " + i);
+ message.setStringProperty("JMSXGroupID", "TEST-GROUP");
+ message.setIntProperty("JMSXGroupSeq", i + 1);
+ LOG.info("sending message: " + message);
+ producer.send(message);
+ }
+
+ // All the messages should have been sent down connection 1.. just get
+ // the first 3
+ for (int i = 0; i < 3; i++) {
+ TextMessage m1 = (TextMessage)consumer1.receive(500);
+ assertNotNull("m1 is null for index: " + i, m1);
+ assertEquals(m1.getIntProperty("JMSXGroupSeq"), i + 1);
+ }
+
+ // Setup a second connection
+ Connection connection1 = factory.createConnection(userName, password);
+ connection1.start();
+ Session session2 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer2 = session2.createConsumer(destination);
+
+ // Close the first consumer.
+ consumer1.close();
+
+ // The last messages should now go the the second consumer.
+ for (int i = 0; i < 1; i++) {
+ TextMessage m1 = (TextMessage)consumer2.receive(500);
+ assertNotNull("m1 is null for index: " + i, m1);
+ assertEquals(m1.getIntProperty("JMSXGroupSeq"), 4 + i);
+ }
+
+ //assert that there are no other messages left for the consumer 2
+ Message m = consumer2.receive(100);
+ assertNull("consumer 2 has some messages left", m);
+ }
+
+ public void testAddingConsumer() throws Exception {
+ ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+ // Setup a first connection
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = session.createProducer(destination);
+ //MessageConsumer consumer = session.createConsumer(destination);
+
+ TextMessage message = session.createTextMessage("message");
+ message.setStringProperty("JMSXGroupID", "TEST-GROUP");
+
+ LOG.info("sending message: " + message);
+ producer.send(message);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ TextMessage msg = (TextMessage)consumer.receive();
+ assertNotNull(msg);
+ boolean first = msg.getBooleanProperty("JMSXGroupFirstForConsumer");
+ assertTrue(first);
+ }
+
+ public void testClosingMessageGroup() throws Exception {
+
+ ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+ // Setup a first connection
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer1 = session.createConsumer(destination);
+ MessageProducer producer = session.createProducer(destination);
+
+ // Send the messages.
+ for (int i = 0; i < 4; i++) {
+ TextMessage message = session.createTextMessage("message " + i);
+ message.setStringProperty("JMSXGroupID", "TEST-GROUP");
+ LOG.info("sending message: " + message);
+ producer.send(message);
+ }
+
+
+
+ // All the messages should have been sent down consumer1.. just get
+ // the first 3
+ for (int i = 0; i < 3; i++) {
+ TextMessage m1 = (TextMessage)consumer1.receive(500);
+ assertNotNull("m1 is null for index: " + i, m1);
+ }
+
+ // Setup a second consumer
+ Connection connection1 = factory.createConnection(userName, password);
+ connection1.start();
+ Session session2 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer2 = session2.createConsumer(destination);
+
+ //assert that there are no messages for the consumer 2
+ Message m = consumer2.receive(100);
+ assertNull("consumer 2 has some messages", m);
+
+ // Close the group
+ TextMessage message = session.createTextMessage("message " + 5);
+ message.setStringProperty("JMSXGroupID", "TEST-GROUP");
+ message.setIntProperty("JMSXGroupSeq", -1);
+ LOG.info("sending message: " + message);
+ producer.send(message);
+
+ //Send some more messages
+ for (int i = 0; i < 4; i++) {
+ message = session.createTextMessage("message " + i);
+ message.setStringProperty("JMSXGroupID", "TEST-GROUP");
+ LOG.info("sending message: " + message);
+ producer.send(message);
+ }
+
+ // Receive the fourth message
+ TextMessage m1 = (TextMessage)consumer1.receive(500);
+ assertNotNull("m1 is null for index: " + 4, m1);
+
+ // Receive the closing message
+ m1 = (TextMessage)consumer1.receive(500);
+ assertNotNull("m1 is null for index: " + 5, m1);
+
+ //assert that there are no messages for the consumer 1
+ m = consumer1.receive(100);
+ assertNull("consumer 1 has some messages left", m);
+
+ // The messages should now go to the second consumer.
+ for (int i = 0; i < 4; i++) {
+ m1 = (TextMessage)consumer2.receive(500);
+ assertNotNull("m1 is null for index: " + i, m1);
+ }
+
+ }
+
+}
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/ProducerFlowControlSendFailTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/ProducerFlowControlSendFailTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/ProducerFlowControlSendFailTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/ProducerFlowControlSendFailTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test1;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ResourceAllocationException;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.legacy.broker.BrokerService;
+import org.apache.activemq.legacy.broker.region.policy.PolicyEntry;
+import org.apache.activemq.legacy.broker.region.policy.PolicyMap;
+import org.apache.activemq.legacy.broker.region.policy.VMPendingQueueMessageStoragePolicy;
+import org.apache.activemq.legacy.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
+
+public class ProducerFlowControlSendFailTest extends ProducerFlowControlTest {
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService service = new BrokerService();
+ service.setPersistent(false);
+ service.setUseJmx(false);
+
+ // Setup a destination policy where it takes only 1 message at a time.
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry policy = new PolicyEntry();
+ policy.setMemoryLimit(1);
+ policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy());
+ policy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
+ policyMap.setDefaultEntry(policy);
+ service.setDestinationPolicy(policyMap);
+
+ service.getSystemUsage().setSendFailIfNoSpace(true);
+
+ connector = service.addConnector("tcp://localhost:0");
+ return service;
+ }
+
+ @Override
+ public void test2ndPubisherWithStandardConnectionThatIsBlocked() throws Exception {
+ // with sendFailIfNoSpace set, there is no blocking of the connection
+ }
+
+ @Override
+ public void testPubisherRecoverAfterBlock() throws Exception {
+ ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+ // with sendFail, there must be no flowControllwindow
+ // sendFail is an alternative flow control mechanism that does not block
+ factory.setUseAsyncSend(true);
+ connection = (ActiveMQConnection)factory.createConnection();
+ connections.add(connection);
+ connection.start();
+
+ final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ final MessageProducer producer = session.createProducer(queueA);
+
+ final AtomicBoolean keepGoing = new AtomicBoolean(true);
+
+ Thread thread = new Thread("Filler") {
+ @Override
+ public void run() {
+ while (keepGoing.get()) {
+ try {
+ producer.send(session.createTextMessage("Test message"));
+ if (gotResourceException.get()) {
+ // do not flood the broker with requests when full as we are sending async and they
+ // will be limited by the network buffers
+ Thread.sleep(200);
+ }
+ } catch (Exception e) {
+ // with async send, there will be no exceptions
+ e.printStackTrace();
+ }
+ }
+ }
+ };
+ thread.start();
+ waitForBlockedOrResourceLimit(new AtomicBoolean(false));
+
+ // resourceException on second message, resumption if we
+ // can receive 10
+ MessageConsumer consumer = session.createConsumer(queueA);
+ TextMessage msg;
+ for (int idx = 0; idx < 10; ++idx) {
+ msg = (TextMessage) consumer.receive(1000);
+ msg.acknowledge();
+ }
+ keepGoing.set(false);
+ }
+
+
+ @Override
+ protected ConnectionFactory createConnectionFactory() throws Exception {
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connector.getConnectUri());
+ connectionFactory.setExceptionListener(new ExceptionListener() {
+ public void onException(JMSException arg0) {
+ if (arg0 instanceof ResourceAllocationException) {
+ gotResourceException.set(true);
+ }
+ }
+ });
+ return connectionFactory;
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/ProducerFlowControlSendFailTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/ProducerFlowControlSendFailTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/ProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/ProducerFlowControlTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/ProducerFlowControlTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/ProducerFlowControlTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test1;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.legacy.broker.BrokerService;
+import org.apache.activemq.legacy.broker.TransportConnector;
+import org.apache.activemq.legacy.broker.region.policy.PolicyEntry;
+import org.apache.activemq.legacy.broker.region.policy.PolicyMap;
+import org.apache.activemq.legacy.broker.region.policy.VMPendingQueueMessageStoragePolicy;
+import org.apache.activemq.legacy.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
+import org.apache.activemq.transport.tcp.TcpTransport;
+
+public class ProducerFlowControlTest extends JmsTestSupport {
+
+ ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A");
+ ActiveMQQueue queueB = new ActiveMQQueue("QUEUE.B");
+ protected TransportConnector connector;
+ protected ActiveMQConnection connection;
+ // used to test sendFailIfNoSpace on SystemUsage
+ protected final AtomicBoolean gotResourceException = new AtomicBoolean(false);
+
+ public void test2ndPubisherWithProducerWindowSendConnectionThatIsBlocked() throws Exception {
+ ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+ factory.setProducerWindowSize(1024 * 64);
+ connection = (ActiveMQConnection)factory.createConnection();
+ connections.add(connection);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queueB);
+
+ // Test sending to Queue A
+ // 1 few sends should not block until the producer window is used up.
+ fillQueue(queueA);
+
+ // Test sending to Queue B it should not block since the connection
+ // should not be blocked.
+ CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
+ assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
+
+ TextMessage msg = (TextMessage)consumer.receive();
+ assertEquals("Message 1", msg.getText());
+ msg.acknowledge();
+
+ pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2");
+ assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
+
+ msg = (TextMessage)consumer.receive();
+ assertEquals("Message 2", msg.getText());
+ msg.acknowledge();
+ }
+
+ public void testPubisherRecoverAfterBlock() throws Exception {
+ ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+ factory.setProducerWindowSize(1024 * 64);
+ factory.setUseAsyncSend(true);
+ connection = (ActiveMQConnection)factory.createConnection();
+ connections.add(connection);
+ connection.start();
+
+ final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ final MessageProducer producer = session.createProducer(queueA);
+
+ final AtomicBoolean done = new AtomicBoolean(true);
+ final AtomicBoolean keepGoing = new AtomicBoolean(true);
+
+
+ Thread thread = new Thread("Filler") {
+ @Override
+ public void run() {
+ while (keepGoing.get()) {
+ done.set(false);
+ try {
+ producer.send(session.createTextMessage("Test message"));
+ } catch (JMSException e) {
+ }
+ }
+ }
+ };
+ thread.start();
+ waitForBlockedOrResourceLimit(done);
+
+ // after receiveing messges, producer should continue sending messages
+ // (done == false)
+ MessageConsumer consumer = session.createConsumer(queueA);
+ TextMessage msg;
+ for (int idx = 0; idx < 5; ++idx) {
+ msg = (TextMessage) consumer.receive(1000);
+ msg.acknowledge();
+ }
+ Thread.sleep(1000);
+ keepGoing.set(false);
+
+ assertFalse(done.get());
+ }
+
+ public void test2ndPubisherWithSyncSendConnectionThatIsBlocked() throws Exception {
+ ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+ factory.setAlwaysSyncSend(true);
+ connection = (ActiveMQConnection)factory.createConnection();
+ connections.add(connection);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queueB);
+
+ // Test sending to Queue A
+ // 1st send should not block. But the rest will.
+ fillQueue(queueA);
+
+ // Test sending to Queue B it should not block.
+ CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
+ assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
+
+ TextMessage msg = (TextMessage)consumer.receive();
+ assertEquals("Message 1", msg.getText());
+ msg.acknowledge();
+
+ pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2");
+ assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
+
+ msg = (TextMessage)consumer.receive();
+ assertEquals("Message 2", msg.getText());
+ msg.acknowledge();
+ }
+
+ public void testSimpleSendReceive() throws Exception {
+ ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+ factory.setAlwaysSyncSend(true);
+ connection = (ActiveMQConnection)factory.createConnection();
+ connections.add(connection);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queueA);
+
+ // Test sending to Queue B it should not block.
+ CountDownLatch pubishDoneToQeueuA = asyncSendTo(queueA, "Message 1");
+ assertTrue(pubishDoneToQeueuA.await(2, TimeUnit.SECONDS));
+
+ TextMessage msg = (TextMessage)consumer.receive();
+ assertEquals("Message 1", msg.getText());
+ msg.acknowledge();
+
+ pubishDoneToQeueuA = asyncSendTo(queueA, "Message 2");
+ assertTrue(pubishDoneToQeueuA.await(2, TimeUnit.SECONDS));
+
+ msg = (TextMessage)consumer.receive();
+ assertEquals("Message 2", msg.getText());
+ msg.acknowledge();
+ }
+
+ public void test2ndPubisherWithStandardConnectionThatIsBlocked() throws Exception {
+ ConnectionFactory factory = createConnectionFactory();
+ connection = (ActiveMQConnection)factory.createConnection();
+ connections.add(connection);
+ connection.start();
+
+ // Test sending to Queue A
+ // 1st send should not block.
+ fillQueue(queueA);
+
+ // Test sending to Queue B it should block.
+ // Since even though the it's queue limits have not been reached, the
+ // connection
+ // is blocked.
+ CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
+ assertFalse(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
+ }
+
+ private void fillQueue(final ActiveMQQueue queue) throws JMSException, InterruptedException {
+ final AtomicBoolean done = new AtomicBoolean(true);
+ final AtomicBoolean keepGoing = new AtomicBoolean(true);
+
+ // Starts an async thread that every time it publishes it sets the done
+ // flag to false.
+ // Once the send starts to block it will not reset the done flag
+ // anymore.
+ new Thread("Fill thread.") {
+ public void run() {
+ Session session = null;
+ try {
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ while (keepGoing.get()) {
+ done.set(false);
+ producer.send(session.createTextMessage("Hello World"));
+ }
+ } catch (JMSException e) {
+ } finally {
+ safeClose(session);
+ }
+ }
+ }.start();
+
+ waitForBlockedOrResourceLimit(done);
+ keepGoing.set(false);
+ }
+
+ protected void waitForBlockedOrResourceLimit(final AtomicBoolean done)
+ throws InterruptedException {
+ while (true) {
+ Thread.sleep(1000);
+ // the producer is blocked once the done flag stays true or there is a resource exception
+ if (done.get() || gotResourceException.get()) {
+ break;
+ }
+ done.set(true);
+ }
+ }
+
+ private CountDownLatch asyncSendTo(final ActiveMQQueue queue, final String message) throws JMSException {
+ final CountDownLatch done = new CountDownLatch(1);
+ new Thread("Send thread.") {
+ public void run() {
+ Session session = null;
+ try {
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ producer.send(session.createTextMessage(message));
+ done.countDown();
+ } catch (JMSException e) {
+ } finally {
+ safeClose(session);
+ }
+ }
+ }.start();
+ return done;
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService service = new BrokerService();
+ service.setPersistent(false);
+ service.setUseJmx(false);
+
+ // Setup a destination policy where it takes only 1 message at a time.
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry policy = new PolicyEntry();
+ policy.setMemoryLimit(1);
+ policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy());
+ policy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
+ policyMap.setDefaultEntry(policy);
+ service.setDestinationPolicy(policyMap);
+
+ connector = service.addConnector("tcp://localhost:0");
+ return service;
+ }
+
+ public void setUp() throws Exception {
+ setAutoFail(true);
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception {
+ if (connection != null) {
+ TcpTransport t = (TcpTransport)connection.getTransport().narrow(TcpTransport.class);
+ t.getTransportListener().onException(new IOException("Disposed."));
+ connection.getTransport().stop();
+ super.tearDown();
+ }
+ }
+
+ protected ConnectionFactory createConnectionFactory() throws Exception {
+ return new ActiveMQConnectionFactory(connector.getConnectUri());
+ }
+}
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/RedeliveryPolicyTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/RedeliveryPolicyTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/RedeliveryPolicyTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/RedeliveryPolicyTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test1;
+
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.Test;
+
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.command.ActiveMQQueue;
+
+/**
+ * Test cases used to test the JMS message exclusive consumers.
+ *
+ * @version $Revision$
+ */
+public class RedeliveryPolicyTest extends JmsTestSupport {
+
+ public static Test suite() {
+ return suite(RedeliveryPolicyTest.class);
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testExponentialRedeliveryPolicyDelaysDeliveryOnRollback() throws Exception {
+
+ // Receive a message with the JMS API
+ RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+ policy.setInitialRedeliveryDelay(500);
+ policy.setBackOffMultiplier((short) 2);
+ policy.setUseExponentialBackOff(true);
+
+ connection.start();
+ Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ ActiveMQQueue destination = new ActiveMQQueue(getName());
+ MessageProducer producer = session.createProducer(destination);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // Send the messages
+ producer.send(session.createTextMessage("1st"));
+ producer.send(session.createTextMessage("2nd"));
+ session.commit();
+
+ TextMessage m;
+ m = (TextMessage)consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+ session.rollback();
+
+ // No delay on first rollback..
+ m = (TextMessage)consumer.receive(100);
+ assertNotNull(m);
+ session.rollback();
+
+ // Show subsequent re-delivery delay is incrementing.
+ m = (TextMessage)consumer.receive(100);
+ assertNull(m);
+
+ m = (TextMessage)consumer.receive(700);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+ session.rollback();
+
+ // Show re-delivery delay is incrementing exponentially
+ m = (TextMessage)consumer.receive(100);
+ assertNull(m);
+ m = (TextMessage)consumer.receive(500);
+ assertNull(m);
+ m = (TextMessage)consumer.receive(700);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+
+ }
+
+
+ /**
+ * @throws Exception
+ */
+ public void testNornalRedeliveryPolicyDelaysDeliveryOnRollback() throws Exception {
+
+ // Receive a message with the JMS API
+ RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+ policy.setInitialRedeliveryDelay(500);
+
+ connection.start();
+ Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ ActiveMQQueue destination = new ActiveMQQueue(getName());
+ MessageProducer producer = session.createProducer(destination);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // Send the messages
+ producer.send(session.createTextMessage("1st"));
+ producer.send(session.createTextMessage("2nd"));
+ session.commit();
+
+ TextMessage m;
+ m = (TextMessage)consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+ session.rollback();
+
+ // No delay on first rollback..
+ m = (TextMessage)consumer.receive(100);
+ assertNotNull(m);
+ session.rollback();
+
+ // Show subsequent re-delivery delay is incrementing.
+ m = (TextMessage)consumer.receive(100);
+ assertNull(m);
+ m = (TextMessage)consumer.receive(700);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+ session.rollback();
+
+ // The message gets redelivered after 500 ms every time since
+ // we are not using exponential backoff.
+ m = (TextMessage)consumer.receive(100);
+ assertNull(m);
+ m = (TextMessage)consumer.receive(700);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testDLQHandling() throws Exception {
+
+ // Receive a message with the JMS API
+ RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+ policy.setInitialRedeliveryDelay(100);
+ policy.setUseExponentialBackOff(false);
+ policy.setMaximumRedeliveries(2);
+
+ connection.start();
+ Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ ActiveMQQueue destination = new ActiveMQQueue("TEST");
+ MessageProducer producer = session.createProducer(destination);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+ MessageConsumer dlqConsumer = session.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
+
+ // Send the messages
+ producer.send(session.createTextMessage("1st"));
+ producer.send(session.createTextMessage("2nd"));
+ session.commit();
+
+ TextMessage m;
+ m = (TextMessage)consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+ session.rollback();
+
+ m = (TextMessage)consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+ session.rollback();
+
+ m = (TextMessage)consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+ session.rollback();
+
+ // The last rollback should cause the 1st message to get sent to the DLQ
+ m = (TextMessage)consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals("2nd", m.getText());
+ session.commit();
+
+ // We should be able to get the message off the DLQ now.
+ m = (TextMessage)dlqConsumer.receive(1000);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+ session.commit();
+
+ }
+
+
+ /**
+ * @throws Exception
+ */
+ public void testInfiniteMaximumNumberOfRedeliveries() throws Exception {
+
+ // Receive a message with the JMS API
+ RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+ policy.setInitialRedeliveryDelay(100);
+ policy.setUseExponentialBackOff(false);
+ // let's set the maximum redeliveries to no maximum (ie. infinite)
+ policy.setMaximumRedeliveries(-1);
+
+
+ connection.start();
+ Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ ActiveMQQueue destination = new ActiveMQQueue("TEST");
+ MessageProducer producer = session.createProducer(destination);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // Send the messages
+ producer.send(session.createTextMessage("1st"));
+ producer.send(session.createTextMessage("2nd"));
+ session.commit();
+
+ TextMessage m;
+
+ m = (TextMessage)consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+ session.rollback();
+
+ //we should be able to get the 1st message redelivered until a session.commit is called
+ m = (TextMessage)consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+ session.rollback();
+
+ m = (TextMessage)consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+ session.rollback();
+
+ m = (TextMessage)consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+ session.rollback();
+
+ m = (TextMessage)consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+ session.rollback();
+
+ m = (TextMessage)consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+ session.commit();
+
+ m = (TextMessage)consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals("2nd", m.getText());
+ session.commit();
+
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testZeroMaximumNumberOfRedeliveries() throws Exception {
+
+ // Receive a message with the JMS API
+ RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+ policy.setInitialRedeliveryDelay(100);
+ policy.setUseExponentialBackOff(false);
+ //let's set the maximum redeliveries to 0
+ policy.setMaximumRedeliveries(0);
+
+ connection.start();
+ Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ ActiveMQQueue destination = new ActiveMQQueue("TEST");
+ MessageProducer producer = session.createProducer(destination);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // Send the messages
+ producer.send(session.createTextMessage("1st"));
+ producer.send(session.createTextMessage("2nd"));
+ session.commit();
+
+ TextMessage m;
+ m = (TextMessage)consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+ session.rollback();
+
+ //the 1st message should not be redelivered since maximumRedeliveries is set to 0
+ m = (TextMessage)consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals("2nd", m.getText());
+ session.commit();
+
+
+
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/RedeliveryPolicyTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/ConsumerReceiveWithTimeoutTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/ConsumerReceiveWithTimeoutTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/ConsumerReceiveWithTimeoutTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/ConsumerReceiveWithTimeoutTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+
+/**
+ * @version $Revision: 1.1.1.1 $
+ */
+public class ConsumerReceiveWithTimeoutTest extends TestSupport {
+
+ private Connection connection;
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ connection = createConnection();
+ }
+
+ /**
+ * @see junit.framework.TestCase#tearDown()
+ */
+ protected void tearDown() throws Exception {
+ if (connection != null) {
+ connection.close();
+ connection = null;
+ }
+ super.tearDown();
+ }
+
+ /**
+ * Test to check if consumer thread wakes up inside a receive(timeout) after
+ * a message is dispatched to the consumer
+ *
+ * @throws javax.jms.JMSException
+ */
+ public void testConsumerReceiveBeforeMessageDispatched() throws JMSException {
+
+ connection.start();
+
+ final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Queue queue = session.createQueue("test");
+
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ // wait for 10 seconds to allow consumer.receive to be run
+ // first
+ Thread.sleep(10000);
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Hello"));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ t.start();
+
+ // Consume the message...
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message msg = consumer.receive(60000);
+ assertNotNull(msg);
+ session.close();
+
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/ConsumerReceiveWithTimeoutTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/DurableSubscriptionTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/DurableSubscriptionTestSupport.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/DurableSubscriptionTestSupport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/DurableSubscriptionTestSupport.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.legacy.broker.BrokerService;
+import org.apache.activemq.legacy.store.PersistenceAdapter;
+
+/**
+ * @version $Revision: 1.1.1.1 $
+ */
+public abstract class DurableSubscriptionTestSupport extends TestSupport {
+
+ private Connection connection;
+ private Session session;
+ private TopicSubscriber consumer;
+ private MessageProducer producer;
+ private BrokerService broker;
+
+ protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+ return new ActiveMQConnectionFactory("vm://durable-broker");
+ }
+
+ protected Connection createConnection() throws Exception {
+ Connection rc = super.createConnection();
+ rc.setClientID(getName());
+ return rc;
+ }
+
+ protected void setUp() throws Exception {
+ createBroker();
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ destroyBroker();
+ }
+
+ protected void restartBroker() throws Exception {
+ destroyBroker();
+ createRestartedBroker(); // retain stored messages
+ }
+
+ private void createBroker() throws Exception {
+ try {
+ broker = new BrokerService();
+ broker.setBrokerName("durable-broker");
+ broker.setDeleteAllMessagesOnStartup(true);
+ broker.setPersistenceAdapter(createPersistenceAdapter());
+ broker.setPersistent(true);
+ broker.start();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ connection = createConnection();
+ }
+
+ private void createRestartedBroker() throws Exception {
+ try {
+ broker = new BrokerService();
+ broker.setBrokerName("durable-broker");
+ broker.setDeleteAllMessagesOnStartup(false);
+ broker.setPersistenceAdapter(createPersistenceAdapter());
+ broker.setPersistent(true);
+ broker.start();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ connection = createConnection();
+ }
+
+ private void destroyBroker() throws Exception {
+ if (connection != null) {
+ connection.close();
+ }
+ if (broker != null) {
+ broker.stop();
+ }
+ }
+
+ protected abstract PersistenceAdapter createPersistenceAdapter() throws Exception;
+
+ public void testUnsubscribeSubscription() throws Exception {
+ session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic("TestTopic");
+ consumer = session.createDurableSubscriber(topic, "sub1");
+ producer = session.createProducer(topic);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ connection.start();
+
+ // Make sure it works when the durable sub is active.
+ producer.send(session.createTextMessage("Msg:1"));
+ assertTextMessageEquals("Msg:1", consumer.receive(5000));
+
+ // Deactivate the sub.
+ consumer.close();
+ // Send a new message.
+ producer.send(session.createTextMessage("Msg:2"));
+ session.unsubscribe("sub1");
+
+ // Reopen the connection.
+ connection.close();
+ connection = createConnection();
+ session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+ producer = session.createProducer(topic);
+ connection.start();
+
+ // Activate the sub.
+ consumer = session.createDurableSubscriber(topic, "sub1");
+ producer.send(session.createTextMessage("Msg:3"));
+
+ // Try to get the message.
+ assertTextMessageEquals("Msg:3", consumer.receive(5000));
+ }
+
+ public void testInactiveDurableSubscriptionTwoConnections() throws Exception {
+ session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic("TestTopic");
+ consumer = session.createDurableSubscriber(topic, "sub1");
+ producer = session.createProducer(topic);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ connection.start();
+
+ // Make sure it works when the durable sub is active.
+ producer.send(session.createTextMessage("Msg:1"));
+ assertTextMessageEquals("Msg:1", consumer.receive(5000));
+
+ // Deactivate the sub.
+ consumer.close();
+
+ // Send a new message.
+ producer.send(session.createTextMessage("Msg:2"));
+
+ // Reopen the connection.
+ connection.close();
+ connection = createConnection();
+ session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+ connection.start();
+
+ // Activate the sub.
+ consumer = session.createDurableSubscriber(topic, "sub1");
+
+ // Try to get the message.
+ assertTextMessageEquals("Msg:2", consumer.receive(5000));
+ }
+
+ public void testInactiveDurableSubscriptionBrokerRestart() throws Exception {
+ session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic("TestTopic");
+ consumer = session.createDurableSubscriber(topic, "sub1");
+ producer = session.createProducer(topic);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ connection.start();
+
+ // Make sure it works when the durable sub is active.
+ producer.send(session.createTextMessage("Msg:1"));
+ assertTextMessageEquals("Msg:1", consumer.receive(5000));
+
+ // Deactivate the sub.
+ consumer.close();
+
+ // Send a new message.
+ producer.send(session.createTextMessage("Msg:2"));
+
+ // Reopen the connection.
+ restartBroker();
+ session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+ connection.start();
+
+ // Activate the sub.
+ consumer = session.createDurableSubscriber(topic, "sub1");
+
+ // Try to get the message.
+ assertTextMessageEquals("Msg:2", consumer.receive(5000));
+ assertNull(consumer.receive(5000));
+ }
+
+ public void testDurableSubscriptionPersistsPastBrokerRestart() throws Exception {
+
+ // Create the durable sub.
+ connection.start();
+ session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+
+ // Ensure that consumer will receive messages sent before it was created
+ Topic topic = session.createTopic("TestTopic?consumer.retroactive=true");
+ consumer = session.createDurableSubscriber(topic, "sub1");
+
+ // Restart the broker.
+ restartBroker();
+
+ // Reconnection
+ connection.start();
+ session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+ producer = session.createProducer(topic);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ // Make sure it works when the durable sub is active.
+ producer.send(session.createTextMessage("Msg:1"));
+
+ // Activate the sub.
+ consumer = session.createDurableSubscriber(topic, "sub1");
+
+ // Send a new message.
+ producer.send(session.createTextMessage("Msg:2"));
+
+ // Try to get the message.
+ assertTextMessageEquals("Msg:1", consumer.receive(5000));
+ assertTextMessageEquals("Msg:2", consumer.receive(5000));
+
+ assertNull(consumer.receive(5000));
+ }
+
+ public void xtestInactiveDurableSubscriptionOneConnection() throws Exception {
+ session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic("TestTopic");
+ consumer = session.createDurableSubscriber(topic, "sub1");
+ producer = session.createProducer(topic);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ connection.start();
+
+ // Make sure it works when the durable sub is active.
+ producer.send(session.createTextMessage("Msg:1"));
+ assertTextMessageEquals("Msg:1", consumer.receive(5000));
+
+ // Deactivate the sub.
+ consumer.close();
+
+ // Send a new message.
+ producer.send(session.createTextMessage("Msg:2"));
+
+ // Activate the sub.
+ consumer = session.createDurableSubscriber(topic, "sub1");
+
+ // Try to get the message.
+ assertTextMessageEquals("Msg:2", consumer.receive(5000));
+ }
+
+ public void testSelectorChange() throws Exception {
+ session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic("TestTopic");
+ consumer = session.createDurableSubscriber(topic, "sub1", "color='red'", false);
+ producer = session.createProducer(topic);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ connection.start();
+
+ // Make sure it works when the durable sub is active.
+ TextMessage msg = session.createTextMessage();
+ msg.setText("Msg:1");
+ msg.setStringProperty("color", "blue");
+ producer.send(msg);
+ msg.setText("Msg:2");
+ msg.setStringProperty("color", "red");
+ producer.send(msg);
+
+ assertTextMessageEquals("Msg:2", consumer.receive(5000));
+
+ // Change the subscription
+ consumer.close();
+ consumer = session.createDurableSubscriber(topic, "sub1", "color='blue'", false);
+
+ // Send a new message.
+ msg.setText("Msg:3");
+ msg.setStringProperty("color", "red");
+ producer.send(msg);
+ msg.setText("Msg:4");
+ msg.setStringProperty("color", "blue");
+ producer.send(msg);
+
+ // Try to get the message.
+ assertTextMessageEquals("Msg:4", consumer.receive(5000));
+ }
+
+ public void testDurableSubWorksInNewSession() throws JMSException {
+
+ // Create the consumer.
+ connection.start();
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Topic topic = session.createTopic("topic-" + getName());
+ MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1");
+ // Drain any messages that may allready be in the sub
+ while (consumer.receive(1000) != null) {
+ }
+
+ // See if the durable sub works in a new session.
+ session.close();
+ session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ // Send a Message that should be added to the durable sub.
+ MessageProducer producer = createProducer(session, topic);
+ producer.send(session.createTextMessage("Message 1"));
+
+ // Activate the durable sub now. And receive the message.
+ consumer = session.createDurableSubscriber(topic, "sub1");
+ Message msg = consumer.receive(1000);
+ assertNotNull(msg);
+ assertEquals("Message 1", ((TextMessage)msg).getText());
+
+ }
+
+ public void testDurableSubWorksInNewConnection() throws Exception {
+
+ // Create the consumer.
+ connection.start();
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Topic topic = session.createTopic("topic-" + getName());
+ MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1");
+ // Drain any messages that may allready be in the sub
+ while (consumer.receive(1000) != null) {
+ }
+
+ // See if the durable sub works in a new connection.
+ // The embeded broker shutsdown when his connections are closed.
+ // So we open the new connection before the old one is closed.
+ connection.close();
+ connection = createConnection();
+ connection.start();
+ session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ // Send a Message that should be added to the durable sub.
+ MessageProducer producer = createProducer(session, topic);
+ producer.send(session.createTextMessage("Message 1"));
+
+ // Activate the durable sub now. And receive the message.
+ consumer = session.createDurableSubscriber(topic, "sub1");
+ Message msg = consumer.receive(1000);
+ assertNotNull(msg);
+ assertEquals("Message 1", ((TextMessage)msg).getText());
+
+ }
+
+ private MessageProducer createProducer(Session session, Destination queue) throws JMSException {
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(getDeliveryMode());
+ return producer;
+ }
+
+ protected int getDeliveryMode() {
+ return DeliveryMode.PERSISTENT;
+ }
+
+ private void assertTextMessageEquals(String string, Message message) throws JMSException {
+ assertNotNull("Message was null", message);
+ assertTrue("Message is not a TextMessage", message instanceof TextMessage);
+ assertEquals(string, ((TextMessage)message).getText());
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/DurableSubscriptionTestSupport.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/DurableSubscriptionTestSupport.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSDurableTopicRedeliverTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSDurableTopicRedeliverTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSDurableTopicRedeliverTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSDurableTopicRedeliverTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision: 1.4 $
+ */
+public class JMSDurableTopicRedeliverTest extends JmsTopicRedeliverTest {
+
+ private static final Log LOG = LogFactory.getLog(JMSDurableTopicRedeliverTest.class);
+
+ protected void setUp() throws Exception {
+ durable = true;
+ super.setUp();
+ }
+
+ /**
+ * Sends and consumes the messages.
+ *
+ * @throws Exception
+ */
+ public void testRedeliverNewSession() throws Exception {
+ String text = "TEST: " + System.currentTimeMillis();
+ Message sendMessage = session.createTextMessage(text);
+
+ if (verbose) {
+ LOG.info("About to send a message: " + sendMessage + " with text: " + text);
+ }
+ producer.send(producerDestination, sendMessage);
+
+ // receive but don't acknowledge
+ Message unackMessage = consumer.receive(1000);
+ assertNotNull(unackMessage);
+ String unackId = unackMessage.getJMSMessageID();
+ assertEquals(((TextMessage)unackMessage).getText(), text);
+ assertFalse(unackMessage.getJMSRedelivered());
+ assertEquals(unackMessage.getIntProperty("JMSXDeliveryCount"), 1);
+ consumeSession.close();
+ consumer.close();
+
+ // receive then acknowledge
+ consumeSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ consumer = createConsumer();
+ Message ackMessage = consumer.receive(1000);
+ assertNotNull(ackMessage);
+ ackMessage.acknowledge();
+ String ackId = ackMessage.getJMSMessageID();
+ assertEquals(((TextMessage)ackMessage).getText(), text);
+ assertTrue(ackMessage.getJMSRedelivered());
+ assertEquals(ackMessage.getIntProperty("JMSXDeliveryCount"), 2);
+ assertEquals(unackId, ackId);
+ consumeSession.close();
+ consumer.close();
+
+ consumeSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ consumer = createConsumer();
+ assertNull(consumer.receive(1000));
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSDurableTopicRedeliverTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSDurableTopicRedeliverTest.java
------------------------------------------------------------------------------
svn:executable = *