You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/18 14:42:29 UTC

svn commit: r786040 [3/6] - in /activemq/sandbox/activemq-flow: activemq-all/src/test/java/org/apache/activemq/legacy/ activemq-all/src/test/java/org/apache/activemq/legacy/broker/ activemq-all/src/test/java/org/apache/activemq/legacy/broker/advisory/ ...

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsTestSupport.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsTestSupport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsTestSupport.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test1;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.apollo.CombinationTestSupport;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.legacy.broker.BrokerFactory;
+import org.apache.activemq.legacy.broker.BrokerService;
+
+/**
+ * Test cases used to test the JMS message consumer.
+ * 
+ * @version $Revision$
+ */
+public class JmsTestSupport extends CombinationTestSupport {
+
+    static final private AtomicLong TEST_COUNTER = new AtomicLong();
+    public String userName;
+    public String password;
+
+    protected ConnectionFactory factory;
+    protected ActiveMQConnection connection;
+    protected BrokerService broker;
+
+    protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>());
+
+    // /////////////////////////////////////////////////////////////////
+    //
+    // Test support methods.
+    //
+    // /////////////////////////////////////////////////////////////////
+    protected ActiveMQDestination createDestination(Session session, byte type) throws JMSException {
+        String testMethod = getName();
+        if( testMethod.indexOf(" ")>0 ) {
+            testMethod = testMethod.substring(0, testMethod.indexOf(" "));
+        }
+        String name = "TEST." + getClass().getName() + "." +testMethod+"."+TEST_COUNTER.getAndIncrement();
+        switch (type) {
+        case ActiveMQDestination.QUEUE_TYPE:
+            return (ActiveMQDestination)session.createQueue(name);
+        case ActiveMQDestination.TOPIC_TYPE:
+            return (ActiveMQDestination)session.createTopic(name);
+        case ActiveMQDestination.TEMP_QUEUE_TYPE:
+            return (ActiveMQDestination)session.createTemporaryQueue();
+        case ActiveMQDestination.TEMP_TOPIC_TYPE:
+            return (ActiveMQDestination)session.createTemporaryTopic();
+        default:
+            throw new IllegalArgumentException("type: " + type);
+        }
+    }
+
+    protected void sendMessages(Destination destination, int count) throws Exception {
+        ConnectionFactory factory = createConnectionFactory();
+        Connection connection = factory.createConnection();
+        connection.start();
+        sendMessages(connection, destination, count);
+        connection.close();
+    }
+
+    protected void sendMessages(Connection connection, Destination destination, int count) throws JMSException {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        sendMessages(session, destination, count);
+        session.close();
+    }
+
+    protected void sendMessages(Session session, Destination destination, int count) throws JMSException {
+        MessageProducer producer = session.createProducer(destination);
+        for (int i = 0; i < count; i++) {
+            producer.send(session.createTextMessage("" + i));
+        }
+        producer.close();
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory("vm://localhost");
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        return BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false"));
+    }
+
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        if (System.getProperty("basedir") == null) {
+            File file = new File(".");
+            System.setProperty("basedir", file.getAbsolutePath());
+        }
+
+        broker = createBroker();
+        broker.start();
+        factory = createConnectionFactory();
+        connection = (ActiveMQConnection)factory.createConnection(userName, password);
+        connections.add(connection);
+    }
+
+    protected void tearDown() throws Exception {
+        for (Iterator iter = connections.iterator(); iter.hasNext();) {
+            Connection conn = (Connection)iter.next();
+            try {
+                conn.close();
+            } catch (Throwable e) {
+            }
+            iter.remove();
+        }
+        broker.stop();
+        super.tearDown();
+    }
+
+    protected void safeClose(Connection c) {
+        try {
+            c.close();
+        } catch (Throwable e) {
+        }
+    }
+
+    protected void safeClose(Session s) {
+        try {
+            s.close();
+        } catch (Throwable e) {
+        }
+    }
+
+    protected void safeClose(MessageConsumer c) {
+        try {
+            c.close();
+        } catch (Throwable e) {
+        }
+    }
+
+    protected void safeClose(MessageProducer p) {
+        try {
+            p.close();
+        } catch (Throwable e) {
+        }
+    }
+
+    protected void profilerPause(String prompt) throws IOException {
+        if (System.getProperty("profiler") != null) {
+            pause(prompt);
+        }
+    }
+
+    protected void pause(String prompt) throws IOException {
+        System.out.println();
+        System.out.println(prompt + "> Press enter to continue: ");
+        while (System.in.read() != '\n') {
+        }
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsTestSupport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsTestSupport.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/LoadTestBurnIn.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/LoadTestBurnIn.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/LoadTestBurnIn.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/LoadTestBurnIn.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test1;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import junit.framework.Test;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.legacy.broker.BrokerFactory;
+import org.apache.activemq.legacy.broker.BrokerService;
+import org.apache.activemq.legacy.broker.TransportConnector;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Small burn test moves sends a moderate amount of messages through the broker,
+ * to checking to make sure that the broker does not lock up after a while of
+ * sustained messaging.
+ * 
+ * @version $Revision$
+ */
+public class LoadTestBurnIn extends JmsTestSupport {
+    private static final transient Log LOG = LogFactory.getLog(LoadTestBurnIn.class);
+
+    public ActiveMQDestination destination;
+    public int deliveryMode;
+    public byte destinationType;
+    public boolean durableConsumer;
+    public int messageCount = 50000;
+    public int messageSize = 1024;
+
+    public static Test suite() {
+        return suite(LoadTestBurnIn.class);
+    }
+
+    protected void setUp() throws Exception {
+        LOG.info("Start: " + getName());
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        try {
+            super.tearDown();
+        } catch (Throwable e) {
+            e.printStackTrace(System.out);
+        } finally {
+            LOG.info("End: " + getName());
+        }
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        return BrokerFactory.createBroker(new URI("broker://(tcp://localhost:0)?useJmx=true"));
+        // return BrokerFactory.createBroker(new
+        // URI("xbean:org/apache/activemq/broker/store/loadtester.xml"));
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws URISyntaxException, IOException {
+        return new ActiveMQConnectionFactory(((TransportConnector)broker.getTransportConnectors().get(0))
+            .getServer().getConnectURI());
+    }
+
+    public void initCombosForTestSendReceive() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
+        addCombinationValues("durableConsumer", new Object[] {Boolean.TRUE});
+        addCombinationValues("messageSize", new Object[] {Integer.valueOf(101), Integer.valueOf(102),
+                                                          Integer.valueOf(103), Integer.valueOf(104),
+                                                          Integer.valueOf(105), Integer.valueOf(106),
+                                                          Integer.valueOf(107), Integer.valueOf(108)});
+    }
+
+    public void testSendReceive() throws Exception {
+
+        // Durable consumer combination is only valid with topics
+        if (durableConsumer && destinationType != ActiveMQDestination.TOPIC_TYPE) {
+            return;
+        }
+
+        connection.setClientID(getName());
+        connection.getPrefetchPolicy().setAll(1000);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer;
+        if (durableConsumer) {
+            consumer = session.createDurableSubscriber((Topic)destination, "sub1:"
+                                                                           + System.currentTimeMillis());
+        } else {
+            consumer = session.createConsumer(destination);
+        }
+        profilerPause("Ready: ");
+
+        final CountDownLatch producerDoneLatch = new CountDownLatch(1);
+
+        // Send the messages, async
+        new Thread() {
+            public void run() {
+                Connection connection2 = null;
+                try {
+                    connection2 = factory.createConnection();
+                    Session session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    MessageProducer producer = session.createProducer(destination);
+                    producer.setDeliveryMode(deliveryMode);
+                    for (int i = 0; i < messageCount; i++) {
+                        BytesMessage m = session.createBytesMessage();
+                        m.writeBytes(new byte[messageSize]);
+                        producer.send(m);
+                    }
+                    producer.close();
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                } finally {
+                    safeClose(connection2);
+                    producerDoneLatch.countDown();
+                }
+
+            }
+        }.start();
+
+        // Make sure all the messages were delivered.
+        Message message = null;
+        for (int i = 0; i < messageCount; i++) {
+            message = consumer.receive(5000);
+            assertNotNull("Did not get message: " + i, message);
+        }
+
+        profilerPause("Done: ");
+
+        assertNull(consumer.receiveNoWait());
+        message.acknowledge();
+
+        // Make sure the producer thread finishes.
+        assertTrue(producerDoneLatch.await(5, TimeUnit.SECONDS));
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/LoadTestBurnIn.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/MessageGroupDelayedTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/MessageGroupDelayedTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/MessageGroupDelayedTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/MessageGroupDelayedTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test1;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Test;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.legacy.broker.BrokerService;
+import org.apache.activemq.legacy.broker.TransportConnector;
+import org.apache.activemq.legacy.broker.region.policy.PolicyEntry;
+import org.apache.activemq.legacy.broker.region.policy.PolicyMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+
+public class MessageGroupDelayedTest extends JmsTestSupport {
+  public static final Log log = LogFactory.getLog(MessageGroupDelayedTest.class);
+  protected Connection connection;
+  protected Session session;
+  protected MessageProducer producer;
+  protected Destination destination;
+  
+  public int consumersBeforeDispatchStarts;
+  public int timeBeforeDispatchStarts;
+  
+  BrokerService broker;
+  protected TransportConnector connector;
+  
+  protected HashMap<String, Integer> messageCount = new HashMap<String, Integer>();
+  protected HashMap<String, Set<String>> messageGroups = new HashMap<String, Set<String>>();
+  
+  public static Test suite() {
+      return suite(MessageGroupDelayedTest.class);
+  }
+
+  public static void main(String[] args) {
+      junit.textui.TestRunner.run(suite());
+  }
+
+  public void setUp() throws Exception {
+	broker = createBroker();  
+	broker.start();
+    ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(connector.getConnectUri() + "?jms.prefetchPolicy.all=1");
+    connection = connFactory.createConnection();
+    session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+    destination = new ActiveMQQueue("test-queue2");
+    producer = session.createProducer(destination);
+    connection.start();
+  }
+  
+  protected BrokerService createBroker() throws Exception {
+      BrokerService service = new BrokerService();
+      service.setPersistent(false);
+      service.setUseJmx(false);
+
+      // Setup a destination policy where it takes only 1 message at a time.
+      PolicyMap policyMap = new PolicyMap();
+      PolicyEntry policy = new PolicyEntry();
+      log.info("testing with consumersBeforeDispatchStarts=" + consumersBeforeDispatchStarts + " and timeBeforeDispatchStarts=" + timeBeforeDispatchStarts);
+      policy.setConsumersBeforeDispatchStarts(consumersBeforeDispatchStarts);
+      policy.setTimeBeforeDispatchStarts(timeBeforeDispatchStarts);
+      policyMap.setDefaultEntry(policy);
+      service.setDestinationPolicy(policyMap);
+
+      connector = service.addConnector("tcp://localhost:0");
+      return service;
+  }
+  
+  public void tearDown() throws Exception {
+      producer.close();
+      session.close();
+      connection.close();
+  }
+  
+  
+  
+  public void initCombosForTestDelayedDirectConnectionListener() {
+	  addCombinationValues("consumersBeforeDispatchStarts", new Object[] {0, 3, 5});
+	  addCombinationValues("timeBeforeDispatchStarts", new Object[] {0, 100});
+  }
+  
+  public void testDelayedDirectConnectionListener() throws Exception {
+	  
+	for(int i = 0; i < 10; i++) {
+      Message msga = session.createTextMessage("hello a");
+      msga.setStringProperty("JMSXGroupID", "A");
+      producer.send(msga);
+      Message msgb = session.createTextMessage("hello b");
+      msgb.setStringProperty("JMSXGroupID", "B");
+      producer.send(msgb);
+      Message msgc = session.createTextMessage("hello c");
+      msgc.setStringProperty("JMSXGroupID", "C");
+      producer.send(msgc);
+    }
+    log.info("30 messages sent to group A/B/C");
+   
+    int[] counters = {10, 10, 10};
+    
+    CountDownLatch startSignal = new CountDownLatch(1);
+    CountDownLatch doneSignal = new CountDownLatch(1);
+
+    messageCount.put("worker1", 0);
+    messageGroups.put("worker1", new HashSet<String>());
+    Worker worker1 = new Worker(connection, destination, "worker1", startSignal, doneSignal, counters, messageCount, messageGroups);
+    messageCount.put("worker2", 0);
+    messageGroups.put("worker2", new HashSet<String>());
+    Worker worker2 = new Worker(connection, destination, "worker2", startSignal, doneSignal, counters, messageCount, messageGroups);
+    messageCount.put("worker3", 0);
+    messageGroups.put("worker3", new HashSet<String>());
+    Worker worker3 = new Worker(connection, destination, "worker3", startSignal, doneSignal, counters, messageCount, messageGroups);
+
+
+    new Thread(worker1).start();
+    new Thread(worker2).start();
+    new Thread(worker3).start();
+
+    startSignal.countDown();
+    doneSignal.await();
+    
+    // check results
+    if (consumersBeforeDispatchStarts == 0 && timeBeforeDispatchStarts == 0) {
+    	log.info("Ignoring results because both parameters are 0");
+    	return;
+    }
+    
+    for (String worker: messageCount.keySet()) {
+    	log.info("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker));
+    	assertEquals("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker)
+    			, 10, messageCount.get(worker).intValue());
+    	assertEquals("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker)
+    			, 1, messageGroups.get(worker).size());
+    }
+    
+  }
+
+  private static final class Worker implements Runnable {
+    private Connection connection = null;
+    private Destination queueName = null;
+    private String workerName = null;
+    private CountDownLatch startSignal = null;
+    private CountDownLatch doneSignal = null;
+    private int[] counters = null;
+    private HashMap<String, Integer> messageCount;
+    private HashMap<String, Set<String>>messageGroups;
+    
+    
+    private Worker(Connection connection, Destination queueName, String workerName, CountDownLatch startSignal, CountDownLatch doneSignal, int[] counters, HashMap<String, Integer> messageCount, HashMap<String, Set<String>>messageGroups) {
+      this.connection = connection;
+      this.queueName = queueName;
+      this.workerName = workerName;
+      this.startSignal = startSignal;
+      this.doneSignal = doneSignal;
+      this.counters = counters;
+      this.messageCount = messageCount;
+      this.messageGroups = messageGroups;
+    }
+    
+    private void update(String group) {
+        int msgCount = messageCount.get(workerName);
+        messageCount.put(workerName, msgCount + 1);
+        Set<String> groups = messageGroups.get(workerName);
+        groups.add(group);
+        messageGroups.put(workerName, groups);
+    }
+    
+    public void run() {
+
+      try {
+        log.info(workerName);
+        startSignal.await();
+        Session sess = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        MessageConsumer consumer = sess.createConsumer(queueName);
+
+        while(true) {
+          if(counters[0] == 0 && counters[1] == 0 && counters[2] == 0 ) {
+            doneSignal.countDown();
+            log.info(workerName + " done...");
+            break;
+          }
+          
+          Message msg = consumer.receive(500);
+          if(msg == null)
+            continue;
+
+          String group = msg.getStringProperty("JMSXGroupID");
+          boolean first = msg.getBooleanProperty("JMSXGroupFirstForConsumer");
+
+          if("A".equals(group)){
+        	--counters[0];
+            update(group);
+            Thread.sleep(500);
+          }
+          else if("B".equals(group)) {
+        	--counters[1];
+            update(group);
+            Thread.sleep(100);
+          }
+          else if("C".equals(group)) {
+        	--counters[2];
+            update(group);
+            Thread.sleep(10);
+          }
+          else {
+            log.warn("unknown group");
+          }
+          if (counters[0] != 0 || counters[1] != 0 || counters[2] != 0 ) {
+        	  msg.acknowledge();
+          }
+        }
+        consumer.close();
+        sess.close();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+  }
+}

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/MessageGroupTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/MessageGroupTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/MessageGroupTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/MessageGroupTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,164 @@
+package org.apache.activemq.legacy.test1;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.apollo.CombinationTestSupport;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class MessageGroupTest extends JmsTestSupport {
+	
+	 private static final Log LOG = LogFactory.getLog(CombinationTestSupport.class);
+
+    public void testGroupedMessagesDeliveredToOnlyOneConsumer() throws Exception {
+
+        ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+        // Setup a first connection
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer1 = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        // Send the messages.
+        for (int i = 0; i < 4; i++) {     	
+        	TextMessage message = session.createTextMessage("message " + i);
+            message.setStringProperty("JMSXGroupID", "TEST-GROUP");
+            message.setIntProperty("JMSXGroupSeq", i + 1);
+            LOG.info("sending message: " + message);
+            producer.send(message);
+        }
+
+        // All the messages should have been sent down connection 1.. just get
+        // the first 3
+        for (int i = 0; i < 3; i++) {
+            TextMessage m1 = (TextMessage)consumer1.receive(500);
+            assertNotNull("m1 is null for index: " + i, m1);
+            assertEquals(m1.getIntProperty("JMSXGroupSeq"), i + 1);
+        }
+        
+        // Setup a second connection
+        Connection connection1 = factory.createConnection(userName, password);
+        connection1.start();
+        Session session2 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer2 = session2.createConsumer(destination);
+
+        // Close the first consumer.
+        consumer1.close();
+
+        // The last messages should now go the the second consumer.
+        for (int i = 0; i < 1; i++) {
+            TextMessage m1 = (TextMessage)consumer2.receive(500);
+            assertNotNull("m1 is null for index: " + i, m1);
+            assertEquals(m1.getIntProperty("JMSXGroupSeq"), 4 + i);
+        }
+
+        //assert that there are no other messages left for the consumer 2
+        Message m = consumer2.receive(100);
+        assertNull("consumer 2 has some messages left", m);
+    }	
+    
+    public void testAddingConsumer() throws Exception {
+        ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+        // Setup a first connection
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        
+        MessageProducer producer = session.createProducer(destination);
+        //MessageConsumer consumer = session.createConsumer(destination);
+        
+    	TextMessage message = session.createTextMessage("message");
+        message.setStringProperty("JMSXGroupID", "TEST-GROUP");
+        
+        LOG.info("sending message: " + message);
+        producer.send(message);
+        
+        MessageConsumer consumer = session.createConsumer(destination);
+        
+        TextMessage msg = (TextMessage)consumer.receive();
+        assertNotNull(msg);
+        boolean first = msg.getBooleanProperty("JMSXGroupFirstForConsumer");
+        assertTrue(first);
+    }    
+    
+    public void testClosingMessageGroup() throws Exception {
+
+        ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+        // Setup a first connection
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer1 = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        // Send the messages.
+        for (int i = 0; i < 4; i++) {     	
+        	TextMessage message = session.createTextMessage("message " + i);
+            message.setStringProperty("JMSXGroupID", "TEST-GROUP");
+            LOG.info("sending message: " + message);
+            producer.send(message);
+        }
+
+
+
+        // All the messages should have been sent down consumer1.. just get
+        // the first 3
+        for (int i = 0; i < 3; i++) {
+            TextMessage m1 = (TextMessage)consumer1.receive(500);
+            assertNotNull("m1 is null for index: " + i, m1);
+        }
+        
+        // Setup a second consumer
+        Connection connection1 = factory.createConnection(userName, password);
+        connection1.start();
+        Session session2 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer2 = session2.createConsumer(destination);
+        
+        //assert that there are no messages for the consumer 2
+        Message m = consumer2.receive(100);
+        assertNull("consumer 2 has some messages", m);
+
+        // Close the group
+    	TextMessage message = session.createTextMessage("message " + 5);
+        message.setStringProperty("JMSXGroupID", "TEST-GROUP");
+        message.setIntProperty("JMSXGroupSeq", -1);
+        LOG.info("sending message: " + message);
+        producer.send(message);
+        
+        //Send some more messages
+        for (int i = 0; i < 4; i++) {     	
+        	message = session.createTextMessage("message " + i);
+            message.setStringProperty("JMSXGroupID", "TEST-GROUP");
+            LOG.info("sending message: " + message);
+            producer.send(message);
+        }
+        
+        // Receive the fourth message
+        TextMessage m1 = (TextMessage)consumer1.receive(500);
+        assertNotNull("m1 is null for index: " + 4, m1);
+        
+        // Receive the closing message
+        m1 = (TextMessage)consumer1.receive(500);
+        assertNotNull("m1 is null for index: " + 5, m1);        
+        
+        //assert that there are no messages for the consumer 1
+        m = consumer1.receive(100);
+        assertNull("consumer 1 has some messages left", m);
+
+        // The messages should now go to the second consumer.
+        for (int i = 0; i < 4; i++) {
+            m1 = (TextMessage)consumer2.receive(500);
+            assertNotNull("m1 is null for index: " + i, m1);
+        }
+
+    }
+	
+}

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/ProducerFlowControlSendFailTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/ProducerFlowControlSendFailTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/ProducerFlowControlSendFailTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/ProducerFlowControlSendFailTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test1;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ResourceAllocationException;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.legacy.broker.BrokerService;
+import org.apache.activemq.legacy.broker.region.policy.PolicyEntry;
+import org.apache.activemq.legacy.broker.region.policy.PolicyMap;
+import org.apache.activemq.legacy.broker.region.policy.VMPendingQueueMessageStoragePolicy;
+import org.apache.activemq.legacy.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
+
+public class ProducerFlowControlSendFailTest extends ProducerFlowControlTest {
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService service = new BrokerService();
+        service.setPersistent(false);
+        service.setUseJmx(false);
+
+        // Setup a destination policy where it takes only 1 message at a time.
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policy = new PolicyEntry();
+        policy.setMemoryLimit(1);
+        policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy());
+        policy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
+        policyMap.setDefaultEntry(policy);
+        service.setDestinationPolicy(policyMap);
+        
+        service.getSystemUsage().setSendFailIfNoSpace(true);
+
+        connector = service.addConnector("tcp://localhost:0");
+        return service;
+    }
+    
+    @Override
+    public void test2ndPubisherWithStandardConnectionThatIsBlocked() throws Exception {
+        // with sendFailIfNoSpace set, there is no blocking of the connection
+    }
+    
+    @Override
+    public void testPubisherRecoverAfterBlock() throws Exception {
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+        // with sendFail, there must be no flowControllwindow
+        // sendFail is an alternative flow control mechanism that does not block
+        factory.setUseAsyncSend(true);
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final MessageProducer producer = session.createProducer(queueA);
+        
+        final AtomicBoolean keepGoing = new AtomicBoolean(true);
+   
+        Thread thread = new Thread("Filler") {
+            @Override
+            public void run() {
+                while (keepGoing.get()) {
+                    try {
+                        producer.send(session.createTextMessage("Test message"));
+                        if (gotResourceException.get()) {
+                            // do not flood the broker with requests when full as we are sending async and they 
+                            // will be limited by the network buffers
+                            Thread.sleep(200);
+                        }
+                    } catch (Exception e) {
+                        // with async send, there will be no exceptions
+                        e.printStackTrace();
+                    }
+                }
+            }
+        };
+        thread.start();
+        waitForBlockedOrResourceLimit(new AtomicBoolean(false));
+
+        // resourceException on second message, resumption if we
+        // can receive 10
+        MessageConsumer consumer = session.createConsumer(queueA);
+        TextMessage msg;
+        for (int idx = 0; idx < 10; ++idx) {
+            msg = (TextMessage) consumer.receive(1000);
+            msg.acknowledge();
+        }
+        keepGoing.set(false);
+    }
+
+    
+	@Override
+	protected ConnectionFactory createConnectionFactory() throws Exception {
+		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connector.getConnectUri());
+		connectionFactory.setExceptionListener(new ExceptionListener() {
+				public void onException(JMSException arg0) {
+					if (arg0 instanceof ResourceAllocationException) {
+						gotResourceException.set(true);
+					}
+				}
+	        });
+		return connectionFactory;
+	}
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/ProducerFlowControlSendFailTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/ProducerFlowControlSendFailTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/ProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/ProducerFlowControlTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/ProducerFlowControlTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/ProducerFlowControlTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test1;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.legacy.broker.BrokerService;
+import org.apache.activemq.legacy.broker.TransportConnector;
+import org.apache.activemq.legacy.broker.region.policy.PolicyEntry;
+import org.apache.activemq.legacy.broker.region.policy.PolicyMap;
+import org.apache.activemq.legacy.broker.region.policy.VMPendingQueueMessageStoragePolicy;
+import org.apache.activemq.legacy.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
+import org.apache.activemq.transport.tcp.TcpTransport;
+
+public class ProducerFlowControlTest extends JmsTestSupport {
+
+    ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A");
+    ActiveMQQueue queueB = new ActiveMQQueue("QUEUE.B");
+    protected TransportConnector connector;
+    protected ActiveMQConnection connection;
+    // used to test sendFailIfNoSpace on SystemUsage 
+    protected final AtomicBoolean gotResourceException = new AtomicBoolean(false);
+
+    public void test2ndPubisherWithProducerWindowSendConnectionThatIsBlocked() throws Exception {
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+        factory.setProducerWindowSize(1024 * 64);
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(queueB);
+
+        // Test sending to Queue A
+        // 1 few sends should not block until the producer window is used up.
+        fillQueue(queueA);
+
+        // Test sending to Queue B it should not block since the connection
+        // should not be blocked.
+        CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
+        assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
+
+        TextMessage msg = (TextMessage)consumer.receive();
+        assertEquals("Message 1", msg.getText());
+        msg.acknowledge();
+
+        pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2");
+        assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
+
+        msg = (TextMessage)consumer.receive();
+        assertEquals("Message 2", msg.getText());
+        msg.acknowledge();
+    }
+
+    public void testPubisherRecoverAfterBlock() throws Exception {
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+        factory.setProducerWindowSize(1024 * 64);
+        factory.setUseAsyncSend(true);
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final MessageProducer producer = session.createProducer(queueA);
+        
+        final AtomicBoolean done = new AtomicBoolean(true);
+        final AtomicBoolean keepGoing = new AtomicBoolean(true);
+        
+   
+		Thread thread = new Thread("Filler") {
+			@Override
+			public void run() {
+                while (keepGoing.get()) {
+                    done.set(false);
+                    try {
+						producer.send(session.createTextMessage("Test message"));
+					} catch (JMSException e) {
+					}
+                }
+			}
+		};
+		thread.start();
+        waitForBlockedOrResourceLimit(done);
+
+        // after receiveing messges, producer should continue sending messages 
+        // (done == false)
+        MessageConsumer consumer = session.createConsumer(queueA);
+        TextMessage msg;
+        for (int idx = 0; idx < 5; ++idx) {
+        	msg = (TextMessage) consumer.receive(1000);
+        	msg.acknowledge();
+        }
+        Thread.sleep(1000);
+        keepGoing.set(false);
+    	
+		assertFalse(done.get());
+    }
+    
+    public void test2ndPubisherWithSyncSendConnectionThatIsBlocked() throws Exception {
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+        factory.setAlwaysSyncSend(true);
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(queueB);
+
+        // Test sending to Queue A
+        // 1st send should not block. But the rest will.
+        fillQueue(queueA);
+
+        // Test sending to Queue B it should not block.
+        CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
+        assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
+
+        TextMessage msg = (TextMessage)consumer.receive();
+        assertEquals("Message 1", msg.getText());
+        msg.acknowledge();
+
+        pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2");
+        assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
+
+        msg = (TextMessage)consumer.receive();
+        assertEquals("Message 2", msg.getText());
+        msg.acknowledge();
+    }
+
+    public void testSimpleSendReceive() throws Exception {
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+        factory.setAlwaysSyncSend(true);
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(queueA);
+
+        // Test sending to Queue B it should not block.
+        CountDownLatch pubishDoneToQeueuA = asyncSendTo(queueA, "Message 1");
+        assertTrue(pubishDoneToQeueuA.await(2, TimeUnit.SECONDS));
+
+        TextMessage msg = (TextMessage)consumer.receive();
+        assertEquals("Message 1", msg.getText());
+        msg.acknowledge();
+
+        pubishDoneToQeueuA = asyncSendTo(queueA, "Message 2");
+        assertTrue(pubishDoneToQeueuA.await(2, TimeUnit.SECONDS));
+
+        msg = (TextMessage)consumer.receive();
+        assertEquals("Message 2", msg.getText());
+        msg.acknowledge();
+    }
+
+    public void test2ndPubisherWithStandardConnectionThatIsBlocked() throws Exception {
+        ConnectionFactory factory = createConnectionFactory();
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        // Test sending to Queue A
+        // 1st send should not block.
+        fillQueue(queueA);
+
+        // Test sending to Queue B it should block.
+        // Since even though the it's queue limits have not been reached, the
+        // connection
+        // is blocked.
+        CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
+        assertFalse(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
+    }
+
+    private void fillQueue(final ActiveMQQueue queue) throws JMSException, InterruptedException {
+        final AtomicBoolean done = new AtomicBoolean(true);
+        final AtomicBoolean keepGoing = new AtomicBoolean(true);
+
+        // Starts an async thread that every time it publishes it sets the done
+        // flag to false.
+        // Once the send starts to block it will not reset the done flag
+        // anymore.
+        new Thread("Fill thread.") {
+            public void run() {
+                Session session = null;
+                try {
+                    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    MessageProducer producer = session.createProducer(queue);
+                    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                    while (keepGoing.get()) {
+                        done.set(false);
+                        producer.send(session.createTextMessage("Hello World"));
+                    }
+                } catch (JMSException e) {
+                } finally {
+                    safeClose(session);
+                }
+            }
+        }.start();
+
+        waitForBlockedOrResourceLimit(done);
+        keepGoing.set(false);
+    }
+
+    protected void waitForBlockedOrResourceLimit(final AtomicBoolean done)
+            throws InterruptedException {
+        while (true) {
+            Thread.sleep(1000);
+            // the producer is blocked once the done flag stays true or there is a resource exception
+            if (done.get() || gotResourceException.get()) {
+                break;
+            }
+            done.set(true);
+        }
+    }
+
+    private CountDownLatch asyncSendTo(final ActiveMQQueue queue, final String message) throws JMSException {
+        final CountDownLatch done = new CountDownLatch(1);
+        new Thread("Send thread.") {
+            public void run() {
+                Session session = null;
+                try {
+                    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    MessageProducer producer = session.createProducer(queue);
+                    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                    producer.send(session.createTextMessage(message));
+                    done.countDown();
+                } catch (JMSException e) {
+                } finally {
+                    safeClose(session);
+                }
+            }
+        }.start();
+        return done;
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService service = new BrokerService();
+        service.setPersistent(false);
+        service.setUseJmx(false);
+
+        // Setup a destination policy where it takes only 1 message at a time.
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policy = new PolicyEntry();
+        policy.setMemoryLimit(1);
+        policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy());
+        policy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
+        policyMap.setDefaultEntry(policy);
+        service.setDestinationPolicy(policyMap);
+
+        connector = service.addConnector("tcp://localhost:0");
+        return service;
+    }
+
+    public void setUp() throws Exception {
+        setAutoFail(true);
+        super.setUp();
+    }
+    
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            TcpTransport t = (TcpTransport)connection.getTransport().narrow(TcpTransport.class);
+            t.getTransportListener().onException(new IOException("Disposed."));
+            connection.getTransport().stop();
+            super.tearDown();
+        }
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(connector.getConnectUri());
+    }
+}

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/RedeliveryPolicyTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/RedeliveryPolicyTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/RedeliveryPolicyTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/RedeliveryPolicyTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test1;
+
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.Test;
+
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.command.ActiveMQQueue;
+
+/**
+ * Test cases used to test the JMS message exclusive consumers.
+ * 
+ * @version $Revision$
+ */
+public class RedeliveryPolicyTest extends JmsTestSupport {
+
+    public static Test suite() {
+        return suite(RedeliveryPolicyTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testExponentialRedeliveryPolicyDelaysDeliveryOnRollback() throws Exception {
+
+        // Receive a message with the JMS API
+        RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+        policy.setInitialRedeliveryDelay(500);
+        policy.setBackOffMultiplier((short) 2);
+        policy.setUseExponentialBackOff(true);
+        
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue(getName());
+        MessageProducer producer = session.createProducer(destination);
+        
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        producer.send(session.createTextMessage("1st"));
+        producer.send(session.createTextMessage("2nd"));
+        session.commit();
+        
+        TextMessage m;
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());        
+        session.rollback();
+
+        // No delay on first rollback..
+        m = (TextMessage)consumer.receive(100);
+        assertNotNull(m);
+        session.rollback();
+        
+        // Show subsequent re-delivery delay is incrementing.
+        m = (TextMessage)consumer.receive(100);
+        assertNull(m);
+        
+        m = (TextMessage)consumer.receive(700);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());        
+        session.rollback();
+        
+        // Show re-delivery delay is incrementing exponentially
+        m = (TextMessage)consumer.receive(100);
+        assertNull(m);
+        m = (TextMessage)consumer.receive(500);
+        assertNull(m);
+        m = (TextMessage)consumer.receive(700);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());        
+        
+    }
+
+
+    /**
+     * @throws Exception
+     */
+    public void testNornalRedeliveryPolicyDelaysDeliveryOnRollback() throws Exception {
+
+        // Receive a message with the JMS API
+        RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+        policy.setInitialRedeliveryDelay(500);
+        
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue(getName());
+        MessageProducer producer = session.createProducer(destination);
+        
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        producer.send(session.createTextMessage("1st"));
+        producer.send(session.createTextMessage("2nd"));
+        session.commit();
+        
+        TextMessage m;
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());        
+        session.rollback();
+
+        // No delay on first rollback..
+        m = (TextMessage)consumer.receive(100);
+        assertNotNull(m);
+        session.rollback();
+        
+        // Show subsequent re-delivery delay is incrementing.
+        m = (TextMessage)consumer.receive(100);
+        assertNull(m);
+        m = (TextMessage)consumer.receive(700);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());        
+        session.rollback();
+        
+        // The message gets redelivered after 500 ms every time since
+        // we are not using exponential backoff.
+        m = (TextMessage)consumer.receive(100);
+        assertNull(m);
+        m = (TextMessage)consumer.receive(700);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());        
+        
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testDLQHandling() throws Exception {
+
+        // Receive a message with the JMS API
+        RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+        policy.setInitialRedeliveryDelay(100);
+        policy.setUseExponentialBackOff(false);
+        policy.setMaximumRedeliveries(2);
+        
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+        
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageConsumer dlqConsumer = session.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
+
+        // Send the messages
+        producer.send(session.createTextMessage("1st"));
+        producer.send(session.createTextMessage("2nd"));
+        session.commit();
+        
+        TextMessage m;
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());        
+        session.rollback();
+
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());        
+        session.rollback();
+
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());        
+        session.rollback();
+        
+        // The last rollback should cause the 1st message to get sent to the DLQ 
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());        
+        session.commit();
+        
+        // We should be able to get the message off the DLQ now.
+        m = (TextMessage)dlqConsumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());        
+        session.commit();
+        
+    }
+    
+    
+    /**
+     * @throws Exception
+     */
+    public void testInfiniteMaximumNumberOfRedeliveries() throws Exception {
+
+        // Receive a message with the JMS API
+        RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+        policy.setInitialRedeliveryDelay(100);
+        policy.setUseExponentialBackOff(false);
+       //  let's set the maximum redeliveries to no maximum (ie. infinite)
+        policy.setMaximumRedeliveries(-1);
+        
+        
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+        
+        MessageConsumer consumer = session.createConsumer(destination);
+        
+        // Send the messages
+        producer.send(session.createTextMessage("1st"));
+        producer.send(session.createTextMessage("2nd"));
+        session.commit();
+           
+        TextMessage m;
+ 
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());        
+        session.rollback();
+        
+        //we should be able to get the 1st message redelivered until a session.commit is called
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());        
+        session.rollback();           
+        
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());        
+        session.rollback();  
+        
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());        
+        session.rollback();  
+        
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());        
+        session.rollback();  
+        
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());        
+        session.commit();  
+        
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());        
+        session.commit();  
+       
+    }
+    
+    /**
+     * @throws Exception
+     */
+    public void testZeroMaximumNumberOfRedeliveries() throws Exception {
+
+        // Receive a message with the JMS API
+        RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+        policy.setInitialRedeliveryDelay(100);
+        policy.setUseExponentialBackOff(false);
+        //let's set the maximum redeliveries to 0
+        policy.setMaximumRedeliveries(0);
+      
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+        
+        MessageConsumer consumer = session.createConsumer(destination);
+        
+        // Send the messages
+        producer.send(session.createTextMessage("1st"));
+        producer.send(session.createTextMessage("2nd"));
+        session.commit();
+        
+        TextMessage m;
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", m.getText());        
+        session.rollback();
+        
+        //the 1st  message should not be redelivered since maximumRedeliveries is set to 0
+        m = (TextMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("2nd", m.getText());        
+        session.commit();        
+     
+  
+       
+    }    
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/RedeliveryPolicyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/ConsumerReceiveWithTimeoutTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/ConsumerReceiveWithTimeoutTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/ConsumerReceiveWithTimeoutTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/ConsumerReceiveWithTimeoutTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+
+/**
+ * @version $Revision: 1.1.1.1 $
+ */
+public class ConsumerReceiveWithTimeoutTest extends TestSupport {
+
+    private Connection connection;
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        connection = createConnection();
+    }
+
+    /**
+     * @see junit.framework.TestCase#tearDown()
+     */
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+        super.tearDown();
+    }
+
+    /**
+     * Test to check if consumer thread wakes up inside a receive(timeout) after
+     * a message is dispatched to the consumer
+     * 
+     * @throws javax.jms.JMSException
+     */
+    public void testConsumerReceiveBeforeMessageDispatched() throws JMSException {
+
+        connection.start();
+
+        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Queue queue = session.createQueue("test");
+
+        Thread t = new Thread() {
+            public void run() {
+                try {
+                    // wait for 10 seconds to allow consumer.receive to be run
+                    // first
+                    Thread.sleep(10000);
+                    MessageProducer producer = session.createProducer(queue);
+                    producer.send(session.createTextMessage("Hello"));
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+
+        t.start();
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(60000);
+        assertNotNull(msg);
+        session.close();
+
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/ConsumerReceiveWithTimeoutTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/DurableSubscriptionTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/DurableSubscriptionTestSupport.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/DurableSubscriptionTestSupport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/DurableSubscriptionTestSupport.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.legacy.broker.BrokerService;
+import org.apache.activemq.legacy.store.PersistenceAdapter;
+
+/**
+ * @version $Revision: 1.1.1.1 $
+ */
+public abstract class DurableSubscriptionTestSupport extends TestSupport {
+
+    private Connection connection;
+    private Session session;
+    private TopicSubscriber consumer;
+    private MessageProducer producer;
+    private BrokerService broker;
+
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory("vm://durable-broker");
+    }
+
+    protected Connection createConnection() throws Exception {
+        Connection rc = super.createConnection();
+        rc.setClientID(getName());
+        return rc;
+    }
+
+    protected void setUp() throws Exception {
+        createBroker();
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        destroyBroker();
+    }
+
+    protected void restartBroker() throws Exception {
+        destroyBroker();
+        createRestartedBroker(); // retain stored messages
+    }
+
+    private void createBroker() throws Exception {
+        try {
+            broker = new BrokerService();
+            broker.setBrokerName("durable-broker");
+            broker.setDeleteAllMessagesOnStartup(true);
+            broker.setPersistenceAdapter(createPersistenceAdapter());
+            broker.setPersistent(true);
+            broker.start();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        connection = createConnection();
+    }
+
+    private void createRestartedBroker() throws Exception {
+        try {
+            broker = new BrokerService();
+            broker.setBrokerName("durable-broker");
+            broker.setDeleteAllMessagesOnStartup(false);
+            broker.setPersistenceAdapter(createPersistenceAdapter());
+            broker.setPersistent(true);
+            broker.start();
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        connection = createConnection();
+    }
+
+    private void destroyBroker() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    protected abstract PersistenceAdapter createPersistenceAdapter() throws Exception;
+
+    public void testUnsubscribeSubscription() throws Exception {
+        session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic("TestTopic");
+        consumer = session.createDurableSubscriber(topic, "sub1");
+        producer = session.createProducer(topic);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        connection.start();
+
+        // Make sure it works when the durable sub is active.
+        producer.send(session.createTextMessage("Msg:1"));
+        assertTextMessageEquals("Msg:1", consumer.receive(5000));
+
+        // Deactivate the sub.
+        consumer.close();
+        // Send a new message.
+        producer.send(session.createTextMessage("Msg:2"));
+        session.unsubscribe("sub1");
+
+        // Reopen the connection.
+        connection.close();
+        connection = createConnection();
+        session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+        producer = session.createProducer(topic);
+        connection.start();
+
+        // Activate the sub.
+        consumer = session.createDurableSubscriber(topic, "sub1");
+        producer.send(session.createTextMessage("Msg:3"));
+
+        // Try to get the message.
+        assertTextMessageEquals("Msg:3", consumer.receive(5000));
+    }
+
+    public void testInactiveDurableSubscriptionTwoConnections() throws Exception {
+        session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic("TestTopic");
+        consumer = session.createDurableSubscriber(topic, "sub1");
+        producer = session.createProducer(topic);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        connection.start();
+
+        // Make sure it works when the durable sub is active.
+        producer.send(session.createTextMessage("Msg:1"));
+        assertTextMessageEquals("Msg:1", consumer.receive(5000));
+
+        // Deactivate the sub.
+        consumer.close();
+
+        // Send a new message.
+        producer.send(session.createTextMessage("Msg:2"));
+
+        // Reopen the connection.
+        connection.close();
+        connection = createConnection();
+        session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+        connection.start();
+
+        // Activate the sub.
+        consumer = session.createDurableSubscriber(topic, "sub1");
+
+        // Try to get the message.
+        assertTextMessageEquals("Msg:2", consumer.receive(5000));
+    }
+
+    public void testInactiveDurableSubscriptionBrokerRestart() throws Exception {
+        session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic("TestTopic");
+        consumer = session.createDurableSubscriber(topic, "sub1");
+        producer = session.createProducer(topic);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        connection.start();
+
+        // Make sure it works when the durable sub is active.
+        producer.send(session.createTextMessage("Msg:1"));
+        assertTextMessageEquals("Msg:1", consumer.receive(5000));
+
+        // Deactivate the sub.
+        consumer.close();
+
+        // Send a new message.
+        producer.send(session.createTextMessage("Msg:2"));
+
+        // Reopen the connection.
+        restartBroker();
+        session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+        connection.start();
+
+        // Activate the sub.
+        consumer = session.createDurableSubscriber(topic, "sub1");
+
+        // Try to get the message.
+        assertTextMessageEquals("Msg:2", consumer.receive(5000));
+        assertNull(consumer.receive(5000));
+    }
+
+    public void testDurableSubscriptionPersistsPastBrokerRestart() throws Exception {
+
+        // Create the durable sub.
+        connection.start();
+        session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+
+        // Ensure that consumer will receive messages sent before it was created
+        Topic topic = session.createTopic("TestTopic?consumer.retroactive=true");
+        consumer = session.createDurableSubscriber(topic, "sub1");
+
+        // Restart the broker.
+        restartBroker();
+
+        // Reconnection
+        connection.start();
+        session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+        producer = session.createProducer(topic);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+        // Make sure it works when the durable sub is active.
+        producer.send(session.createTextMessage("Msg:1"));
+
+        // Activate the sub.
+        consumer = session.createDurableSubscriber(topic, "sub1");
+
+        // Send a new message.
+        producer.send(session.createTextMessage("Msg:2"));
+
+        // Try to get the message.
+        assertTextMessageEquals("Msg:1", consumer.receive(5000));
+        assertTextMessageEquals("Msg:2", consumer.receive(5000));
+
+        assertNull(consumer.receive(5000));
+    }
+
+    public void xtestInactiveDurableSubscriptionOneConnection() throws Exception {
+        session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic("TestTopic");
+        consumer = session.createDurableSubscriber(topic, "sub1");
+        producer = session.createProducer(topic);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        connection.start();
+
+        // Make sure it works when the durable sub is active.
+        producer.send(session.createTextMessage("Msg:1"));
+        assertTextMessageEquals("Msg:1", consumer.receive(5000));
+
+        // Deactivate the sub.
+        consumer.close();
+
+        // Send a new message.
+        producer.send(session.createTextMessage("Msg:2"));
+
+        // Activate the sub.
+        consumer = session.createDurableSubscriber(topic, "sub1");
+
+        // Try to get the message.
+        assertTextMessageEquals("Msg:2", consumer.receive(5000));
+    }
+
+    public void testSelectorChange() throws Exception {
+        session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic("TestTopic");
+        consumer = session.createDurableSubscriber(topic, "sub1", "color='red'", false);
+        producer = session.createProducer(topic);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        connection.start();
+
+        // Make sure it works when the durable sub is active.
+        TextMessage msg = session.createTextMessage();
+        msg.setText("Msg:1");
+        msg.setStringProperty("color", "blue");
+        producer.send(msg);
+        msg.setText("Msg:2");
+        msg.setStringProperty("color", "red");
+        producer.send(msg);
+
+        assertTextMessageEquals("Msg:2", consumer.receive(5000));
+
+        // Change the subscription
+        consumer.close();
+        consumer = session.createDurableSubscriber(topic, "sub1", "color='blue'", false);
+
+        // Send a new message.
+        msg.setText("Msg:3");
+        msg.setStringProperty("color", "red");
+        producer.send(msg);
+        msg.setText("Msg:4");
+        msg.setStringProperty("color", "blue");
+        producer.send(msg);
+
+        // Try to get the message.
+        assertTextMessageEquals("Msg:4", consumer.receive(5000));
+    }
+
+    public void testDurableSubWorksInNewSession() throws JMSException {
+
+        // Create the consumer.
+        connection.start();
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Topic topic = session.createTopic("topic-" + getName());
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1");
+        // Drain any messages that may allready be in the sub
+        while (consumer.receive(1000) != null) {
+        }
+
+        // See if the durable sub works in a new session.
+        session.close();
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+        // Send a Message that should be added to the durable sub.
+        MessageProducer producer = createProducer(session, topic);
+        producer.send(session.createTextMessage("Message 1"));
+
+        // Activate the durable sub now. And receive the message.
+        consumer = session.createDurableSubscriber(topic, "sub1");
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+        assertEquals("Message 1", ((TextMessage)msg).getText());
+
+    }
+
+    public void testDurableSubWorksInNewConnection() throws Exception {
+
+        // Create the consumer.
+        connection.start();
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Topic topic = session.createTopic("topic-" + getName());
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1");
+        // Drain any messages that may allready be in the sub
+        while (consumer.receive(1000) != null) {
+        }
+
+        // See if the durable sub works in a new connection.
+        // The embeded broker shutsdown when his connections are closed.
+        // So we open the new connection before the old one is closed.
+        connection.close();
+        connection = createConnection();
+        connection.start();
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+        // Send a Message that should be added to the durable sub.
+        MessageProducer producer = createProducer(session, topic);
+        producer.send(session.createTextMessage("Message 1"));
+
+        // Activate the durable sub now. And receive the message.
+        consumer = session.createDurableSubscriber(topic, "sub1");
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+        assertEquals("Message 1", ((TextMessage)msg).getText());
+
+    }
+
+    private MessageProducer createProducer(Session session, Destination queue) throws JMSException {
+        MessageProducer producer = session.createProducer(queue);
+        producer.setDeliveryMode(getDeliveryMode());
+        return producer;
+    }
+
+    protected int getDeliveryMode() {
+        return DeliveryMode.PERSISTENT;
+    }
+
+    private void assertTextMessageEquals(String string, Message message) throws JMSException {
+        assertNotNull("Message was null", message);
+        assertTrue("Message is not a TextMessage", message instanceof TextMessage);
+        assertEquals(string, ((TextMessage)message).getText());
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/DurableSubscriptionTestSupport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/DurableSubscriptionTestSupport.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSDurableTopicRedeliverTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSDurableTopicRedeliverTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSDurableTopicRedeliverTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSDurableTopicRedeliverTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision: 1.4 $
+ */
+public class JMSDurableTopicRedeliverTest extends JmsTopicRedeliverTest {
+
+    private static final Log LOG = LogFactory.getLog(JMSDurableTopicRedeliverTest.class);
+
+    protected void setUp() throws Exception {
+        durable = true;
+        super.setUp();
+    }
+
+    /**
+     * Sends and consumes the messages.
+     * 
+     * @throws Exception
+     */
+    public void testRedeliverNewSession() throws Exception {
+        String text = "TEST: " + System.currentTimeMillis();
+        Message sendMessage = session.createTextMessage(text);
+
+        if (verbose) {
+            LOG.info("About to send a message: " + sendMessage + " with text: " + text);
+        }
+        producer.send(producerDestination, sendMessage);
+
+        // receive but don't acknowledge
+        Message unackMessage = consumer.receive(1000);
+        assertNotNull(unackMessage);
+        String unackId = unackMessage.getJMSMessageID();
+        assertEquals(((TextMessage)unackMessage).getText(), text);
+        assertFalse(unackMessage.getJMSRedelivered());
+        assertEquals(unackMessage.getIntProperty("JMSXDeliveryCount"), 1);
+        consumeSession.close();
+        consumer.close();
+
+        // receive then acknowledge
+        consumeSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        consumer = createConsumer();
+        Message ackMessage = consumer.receive(1000);
+        assertNotNull(ackMessage);
+        ackMessage.acknowledge();
+        String ackId = ackMessage.getJMSMessageID();
+        assertEquals(((TextMessage)ackMessage).getText(), text);
+        assertTrue(ackMessage.getJMSRedelivered());
+        assertEquals(ackMessage.getIntProperty("JMSXDeliveryCount"), 2);
+        assertEquals(unackId, ackId);
+        consumeSession.close();
+        consumer.close();
+
+        consumeSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        consumer = createConsumer();
+        assertNull(consumer.receive(1000));
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSDurableTopicRedeliverTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSDurableTopicRedeliverTest.java
------------------------------------------------------------------------------
    svn:executable = *