You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/07/24 15:30:12 UTC

svn commit: r1365040 - in /activemq/activemq-apollo/trunk: apollo-broker/src/test/scala/ apollo-itests/src/test/java/org/apache/activemq/apollo/

Author: chirino
Date: Tue Jul 24 13:30:11 2012
New Revision: 1365040

URL: http://svn.apache.org/viewvc?rev=1365040&view=rev
Log:
Ported the JMSConsumerTest from ActiveMQ 5.x

Added:
    activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/DestinationType.java
    activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JMSConsumerTest.java   (with props)
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
    activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/BrokerProtocol.java
    activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JmsTestBase.java
    activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/OpenwireBrokerProtocol.java
    activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/StompBrokerProtocol.java

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala?rev=1365040&r1=1365039&r2=1365040&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala Tue Jul 24 13:30:11 2012
@@ -23,47 +23,13 @@ import FileSupport._
 import org.fusesource.hawtdispatch._
 import org.apache.activemq.apollo.dto.{AggregateDestMetricsDTO, QueueStatusDTO, TopicStatusDTO}
 
-/**
- * <p>
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class BrokerFunSuiteSupport extends FunSuiteSupport with Logging { // with ShouldMatchers with BeforeAndAfterEach with Logging {
-  var broker: Broker = null
-  var port = 0
-
-  def broker_config_uri = "xml:classpath:apollo.xml"
-
-  def createBroker: Broker = {
-    info("Loading broker configuration from the classpath with URI: " + broker_config_uri)
-    var broker = BrokerFactory.createBroker(broker_config_uri)
-    broker.setTmp(basedir / "target" / "tmp")
-    broker.getTmp().mkdirs()
-    broker
-  }
-
-  override def beforeAll() = {
-    super.beforeAll()
-    try {
-      broker = createBroker
-      ServiceControl.start(broker)
-      port = broker.get_socket_address.asInstanceOf[InetSocketAddress].getPort
-    } catch {
-      case e: Throwable => e.printStackTrace
-    }
-  }
+object BrokerTestSupport {
 
-  override def afterAll() = {
-    ServiceControl.stop(broker)
-    super.afterAll()
-  }
-
-  def connector_port(connector: String): Option[Int] = Option(connector).map {
-    id => broker.connectors.get(id).map(_.socket_address.asInstanceOf[InetSocketAddress].getPort).getOrElse(port)
+  def connector_port(broker:Broker, connector: String): Option[Int] = Option(connector).map {
+    id => broker.connectors.get(id).map(_.socket_address.asInstanceOf[InetSocketAddress].getPort).getOrElse(0)
   }
 
-  def queue_exists(name: String): Boolean = {
+  def queue_exists(broker:Broker, name: String): Boolean = {
     val host = broker.default_virtual_host
     host.dispatch_queue.future {
       val router = host.router.asInstanceOf[LocalRouter]
@@ -71,7 +37,7 @@ class BrokerFunSuiteSupport extends FunS
     }.await()
   }
 
-  def delete_queue(name: String) = {
+  def delete_queue(broker:Broker, name: String) = {
     val host = broker.default_virtual_host
     host.dispatch_queue.future {
       val router = host.router.asInstanceOf[LocalRouter]
@@ -81,7 +47,7 @@ class BrokerFunSuiteSupport extends FunS
     }.await()
   }
 
-  def topic_exists(name: String): Boolean = {
+  def topic_exists(broker:Broker, name: String): Boolean = {
     val host = broker.default_virtual_host
     host.dispatch_queue.future {
       val router = host.router.asInstanceOf[LocalRouter]
@@ -89,7 +55,7 @@ class BrokerFunSuiteSupport extends FunS
     }.await()
   }
 
-  def topic_status(name: String): TopicStatusDTO = {
+  def topic_status(broker:Broker, name: String): TopicStatusDTO = {
     val host = broker.default_virtual_host
     sync(host) {
       val router = host.router.asInstanceOf[LocalRouter]
@@ -97,28 +63,28 @@ class BrokerFunSuiteSupport extends FunS
     }
   }
 
-  def get_queue_metrics: AggregateDestMetricsDTO = {
+  def get_queue_metrics(broker:Broker): AggregateDestMetricsDTO = {
     val host = broker.default_virtual_host
     sync(host) {
       host.get_queue_metrics
     }
   }
 
-  def get_topic_metrics: AggregateDestMetricsDTO = {
+  def get_topic_metrics(broker:Broker): AggregateDestMetricsDTO = {
     val host = broker.default_virtual_host
     sync(host) {
       host.get_topic_metrics
     }
   }
 
-  def get_dsub_metrics: AggregateDestMetricsDTO = {
+  def get_dsub_metrics(broker:Broker): AggregateDestMetricsDTO = {
     val host = broker.default_virtual_host
     sync(host) {
       host.get_dsub_metrics
     }
   }
 
-  def queue_status(name: String): QueueStatusDTO = {
+  def queue_status(broker:Broker, name: String): QueueStatusDTO = {
     val host = broker.default_virtual_host
     sync(host) {
       val router = host.router.asInstanceOf[LocalRouter]
@@ -129,7 +95,7 @@ class BrokerFunSuiteSupport extends FunS
     }
   }
 
-  def dsub_status(name: String): QueueStatusDTO = {
+  def dsub_status(broker:Broker, name: String): QueueStatusDTO = {
     val host = broker.default_virtual_host
     sync(host) {
       val router = host.router.asInstanceOf[LocalRouter]
@@ -137,9 +103,57 @@ class BrokerFunSuiteSupport extends FunS
     }
   }
 
-  def webadmin_uri(scheme:String = "http") = {
+  def webadmin_uri(broker:Broker, scheme:String) = {
     Option(broker.web_server).flatMap(_.uris().find(_.getScheme == scheme)).get
   }
+}
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class BrokerFunSuiteSupport extends FunSuiteSupport with Logging { // with ShouldMatchers with BeforeAndAfterEach with Logging {
+  var broker: Broker = null
+  var port = 0
+
+  def broker_config_uri = "xml:classpath:apollo.xml"
+
+  def createBroker: Broker = {
+    info("Loading broker configuration from the classpath with URI: " + broker_config_uri)
+    var broker = BrokerFactory.createBroker(broker_config_uri)
+    broker.setTmp(basedir / "target" / "tmp")
+    broker.getTmp().mkdirs()
+    broker
+  }
+
+  override def beforeAll() = {
+    super.beforeAll()
+    try {
+      broker = createBroker
+      ServiceControl.start(broker)
+      port = broker.get_socket_address.asInstanceOf[InetSocketAddress].getPort
+    } catch {
+      case e: Throwable => e.printStackTrace
+    }
+  }
+
+  override def afterAll() = {
+    ServiceControl.stop(broker)
+    super.afterAll()
+  }
+
+  def connector_port(connector: String) = BrokerTestSupport.connector_port(broker, connector)
+  def queue_exists(name: String) = BrokerTestSupport.queue_exists(broker, name)
+  def delete_queue(name: String) = BrokerTestSupport.delete_queue(broker, name)
+  def topic_exists(name: String) = BrokerTestSupport.topic_exists(broker, name)
+  def topic_status(name: String) = BrokerTestSupport.topic_status(broker, name)
+  def get_queue_metrics = BrokerTestSupport.get_queue_metrics(broker)
+  def get_topic_metrics = BrokerTestSupport.get_topic_metrics(broker)
+  def get_dsub_metrics = BrokerTestSupport.get_dsub_metrics(broker)
+  def queue_status(name: String) = BrokerTestSupport.queue_status(broker, name)
+  def dsub_status(name: String) = BrokerTestSupport.dsub_status(broker, name)
+  def webadmin_uri(scheme:String = "http") = BrokerTestSupport.webadmin_uri(broker, scheme)
 
   def json(value:Any) = org.apache.activemq.apollo.dto.JsonCodec.encode(value).ascii().toString;
 

Modified: activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/BrokerProtocol.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/BrokerProtocol.java?rev=1365040&r1=1365039&r2=1365040&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/BrokerProtocol.java (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/BrokerProtocol.java Tue Jul 24 13:30:11 2012
@@ -18,11 +18,17 @@ package org.apache.activemq.apollo;
 
 import org.apache.activemq.apollo.broker.Broker;
 import org.apache.activemq.apollo.broker.BrokerFactory;
+import org.apache.activemq.apollo.broker.BrokerFunSuiteSupport;
+import org.apache.activemq.apollo.broker.BrokerTestSupport;
+import org.apache.activemq.apollo.dto.DestMetricsDTO;
+import org.apache.activemq.apollo.dto.QueueStatusDTO;
+import org.apache.activemq.apollo.dto.TopicStatusDTO;
 import org.apache.activemq.apollo.util.ServiceControl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
 import java.net.InetSocketAddress;
 
 /**
@@ -51,6 +57,55 @@ abstract public class BrokerProtocol {
         return address.getPort();
     }
 
+    private DestMetricsDTO getMetrics(Broker broker, Destination destination) {
+        DestMetricsDTO metrics = null;
+        switch (DestinationType.of(destination)) {
+            case QUEUE_TYPE:
+            case TEMP_QUEUE_TYPE:{
+                QueueStatusDTO dto = BrokerTestSupport.queue_status((Broker) broker, name(destination));
+                if( dto != null ) {
+                    metrics = dto.metrics;
+                }
+            }
+            case TOPIC_TYPE: {
+                final TopicStatusDTO dto = BrokerTestSupport.topic_status((Broker) broker, name(destination));
+                if( dto != null ) {
+                    metrics = dto.metrics;
+                }
+            }
+            case TEMP_TOPIC_TYPE:
+        }
+        return metrics;
+    }
+
+    public long getInflightCount(Object broker, Destination destination) {
+        DestMetricsDTO metrics = getMetrics((Broker) broker, destination);
+        if( metrics==null ) {
+            return 0;
+        }
+        return metrics.queue_size;
+    }
+
+    public long getDequeueCount(Object broker, Destination destination) {
+        DestMetricsDTO metrics = getMetrics((Broker) broker, destination);
+        if( metrics==null ) {
+            return 0;
+        }
+        return metrics.dequeue_item_counter;
+    }
+
+//    protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
+//         String domain = "org.apache.activemq";
+//         ObjectName name;
+//        if (destination.isQueue()) {
+//            name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=test");
+//        } else {
+//            name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=test");
+//        }
+//        return (DestinationViewMBean)broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true);
+//    }
+
     abstract ConnectionFactory getConnectionFactory(Object broker);
+    protected abstract String name(Destination destination);
 
 }

Added: activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/DestinationType.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/DestinationType.java?rev=1365040&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/DestinationType.java (added)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/DestinationType.java Tue Jul 24 13:30:11 2012
@@ -0,0 +1,33 @@
+package org.apache.activemq.apollo;
+
+import javax.jms.Destination;
+import javax.jms.*;
+
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public enum DestinationType {
+    QUEUE_TYPE,
+    TOPIC_TYPE,
+    TEMP_QUEUE_TYPE,
+    TEMP_TOPIC_TYPE;
+
+    public static DestinationType of(Destination d) {
+        if( d instanceof Queue) {
+            if( d instanceof TemporaryQueue ) {
+                return TEMP_QUEUE_TYPE;
+            } else {
+                return QUEUE_TYPE;
+            }
+        }
+        if( d instanceof Topic) {
+            if( d instanceof TemporaryTopic ) {
+                return TEMP_TOPIC_TYPE;
+            } else {
+                return TOPIC_TYPE;
+            }
+        }
+        return null;
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JMSConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JMSConsumerTest.java?rev=1365040&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JMSConsumerTest.java (added)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JMSConsumerTest.java Tue Jul 24 13:30:11 2012
@@ -0,0 +1,967 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo;
+
+import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageConsumer;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import javax.management.ObjectName;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Test cases used to test the JMS message consumer.
+ *
+ * This test case currently has lots of tests commented out.. Look for the
+ * 'TODO's.  https://issues.apache.org/jira/browse/APLO-227 is tracking these
+ * issues.
+ */
+public class JMSConsumerTest extends JmsTestBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JMSConsumerTest.class);
+
+    public static Test suite() {
+        return suite(JMSConsumerTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    public Destination destination;
+    public int deliveryMode;
+    public int prefetch;
+    public int ackMode;
+    public DestinationType destinationType;
+    public boolean durableConsumer;
+
+    // TODO: figure out why this test does not work /w stompjms
+    public void initCombosForTestMessageListenerWithConsumerCanBeStopped() {
+        setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE,
+                                                              DestinationType.TEMP_QUEUE_TYPE, DestinationType.TEMP_TOPIC_TYPE});
+    }
+
+    public void testMessageListenerWithConsumerCanBeStopped() throws Exception {
+
+        final AtomicInteger counter = new AtomicInteger(0);
+        final CountDownLatch done1 = new CountDownLatch(1);
+        final CountDownLatch done2 = new CountDownLatch(1);
+
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)session.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message m) {
+                counter.incrementAndGet();
+                if (counter.get() == 1) {
+                    done1.countDown();
+                }
+                if (counter.get() == 2) {
+                    done2.countDown();
+                }
+            }
+        });
+
+        // Send a first message to make sure that the consumer dispatcher is
+        // running
+        sendMessages(session, destination, 1);
+        assertTrue(done1.await(1, TimeUnit.SECONDS));
+        assertEquals(1, counter.get());
+
+        // Stop the consumer.
+        consumer.stop();
+
+        // Send a message, but should not get delivered.
+        sendMessages(session, destination, 1);
+        assertFalse(done2.await(1, TimeUnit.SECONDS));
+        assertEquals(1, counter.get());
+
+        // Start the consumer, and the message should now get delivered.
+        consumer.start();
+        assertTrue(done2.await(1, TimeUnit.SECONDS));
+        assertEquals(2, counter.get());
+    }
+
+    // TODO: figure out why this test causes a OOM /w the stompjms client.
+    public void initCombosForTestMessageListenerWithConsumerCanBeStoppedConcurently() {
+        setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
+    }
+    public void testMessageListenerWithConsumerCanBeStoppedConcurently() throws Exception {
+
+        final AtomicInteger counter = new AtomicInteger(0);
+        final CountDownLatch closeDone = new CountDownLatch(1);
+
+        connection.start();
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        destination = createDestination(session, DestinationType.QUEUE_TYPE);
+
+        // preload the queue
+        sendMessages(session, destination, 2000);
+
+
+        final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)session.createConsumer(destination);
+
+        final Map<Thread, Throwable> exceptions =
+            Collections.synchronizedMap(new HashMap<Thread, Throwable>());
+        Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+            public void uncaughtException(Thread t, Throwable e) {
+                LOG.error("Uncaught exception:", e);
+                exceptions.put(t, e);
+            }
+        });
+
+        final class AckAndClose implements Runnable {
+            private Message message;
+
+            public AckAndClose(Message m) {
+                this.message = m;
+            }
+
+            public void run() {
+                try {
+                    int count = counter.incrementAndGet();
+                    if (count == 590) {
+                        // close in a separate thread is ok by jms
+                        consumer.close();
+                        closeDone.countDown();
+                    }
+                    if (count % 200 == 0) {
+                        // ensure there are some outstanding messages
+                        // ack every 200
+                        message.acknowledge();
+                    }
+                } catch (Exception e) {
+                    LOG.error("Exception on close or ack:", e);
+                    exceptions.put(Thread.currentThread(), e);
+                }
+            }
+        };
+
+        final ExecutorService executor = Executors.newCachedThreadPool();
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message m) {
+                // ack and close eventually in separate thread
+                executor.execute(new AckAndClose(m));
+            }
+        });
+
+        assertTrue(closeDone.await(20, TimeUnit.SECONDS));
+        // await possible exceptions
+        Thread.sleep(1000);
+        assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+    }
+
+
+    public void initCombosForTestMutiReceiveWithPrefetch1() {
+        setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE),
+                                                      Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
+        addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE,
+                                                              DestinationType.TEMP_QUEUE_TYPE, DestinationType.TEMP_TOPIC_TYPE});
+    }
+    //
+    // TODO: support setting the prefetch policy on a stompjms connection.
+    // TODO: test is failing.
+    //
+    public void ignoreMutiReceiveWithPrefetch1() throws Exception {
+
+        // Set prefetch to 1
+        ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+        connection.start();
+
+        // Use all the ack modes
+        Session session = connection.createSession(false, ackMode);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        sendMessages(session, destination, 4);
+
+        // Make sure 4 messages were delivered.
+        Message message = null;
+        for (int i = 0; i < 4; i++) {
+            message = consumer.receive(1000);
+            assertNotNull(message);
+        }
+        assertNull(consumer.receiveNoWait());
+        message.acknowledge();
+    }
+
+    //
+    // TODO: find out why this test is failing on a stompjms connection.
+    //
+    public void initCombosForTestDurableConsumerSelectorChange() {
+        setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {DestinationType.TOPIC_TYPE});
+    }
+
+    public void testDurableConsumerSelectorChange() throws Exception {
+
+        // Receive a message with the JMS API
+        connection.setClientID("test");
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(deliveryMode);
+        MessageConsumer consumer = session.createDurableSubscriber((Topic)destination, "test", "color='red'", false);
+
+        // Send the messages
+        TextMessage message = session.createTextMessage("1st");
+        message.setStringProperty("color", "red");
+        producer.send(message);
+
+        Message m = consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", ((TextMessage)m).getText());
+
+        // Change the subscription.
+        consumer.close();
+        consumer = session.createDurableSubscriber((Topic)destination, "test", "color='blue'", false);
+
+        message = session.createTextMessage("2nd");
+        message.setStringProperty("color", "red");
+        producer.send(message);
+        message = session.createTextMessage("3rd");
+        message.setStringProperty("color", "blue");
+        producer.send(message);
+
+        // Selector should skip the 2nd message.
+        m = consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("3rd", ((TextMessage)m).getText());
+
+        assertNull(consumer.receiveNoWait());
+    }
+
+    //
+    // TODO: find out why this test is failing on a stompjms connection.
+    //
+    public void initCombosForTestSendReceiveBytesMessage() {
+        setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE,
+                                                              DestinationType.TEMP_QUEUE_TYPE, DestinationType.TEMP_TOPIC_TYPE});
+    }
+
+    public void testSendReceiveBytesMessage() throws Exception {
+
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        BytesMessage message = session.createBytesMessage();
+        message.writeBoolean(true);
+        message.writeBoolean(false);
+        producer.send(message);
+
+        // Make sure only 1 message was delivered.
+        BytesMessage m = (BytesMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertTrue(m.readBoolean());
+        assertFalse(m.readBoolean());
+
+        assertNull(consumer.receiveNoWait());
+    }
+
+    //
+    // TODO: find out why this test is failing on a stompjms connection.
+    //
+    public void initCombosForTestSetMessageListenerAfterStart() {
+        setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE,
+                                                              DestinationType.TEMP_QUEUE_TYPE, DestinationType.TEMP_TOPIC_TYPE});
+    }
+
+    public void testSetMessageListenerAfterStart() throws Exception {
+
+        final AtomicInteger counter = new AtomicInteger(0);
+        final CountDownLatch done = new CountDownLatch(1);
+
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        sendMessages(session, destination, 4);
+
+        // See if the message get sent to the listener
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message m) {
+                counter.incrementAndGet();
+                if (counter.get() == 4) {
+                    done.countDown();
+                }
+            }
+        });
+
+        assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
+        Thread.sleep(200);
+
+        // Make sure only 4 messages were delivered.
+        assertEquals(4, counter.get());
+    }
+
+    //
+    // TODO: find out why this test is failing on a stompjms connection.
+    //
+    public void initCombosForTestPassMessageListenerIntoCreateConsumer() {
+        setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
+        addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE});
+    }
+
+    public void testPassMessageListenerIntoCreateConsumer() throws Exception {
+
+        final AtomicInteger counter = new AtomicInteger(0);
+        final CountDownLatch done = new CountDownLatch(1);
+
+        // Receive a message with the JMS API
+        connection.start();
+        ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination, new MessageListener() {
+            public void onMessage(Message m) {
+                counter.incrementAndGet();
+                if (counter.get() == 4) {
+                    done.countDown();
+                }
+            }
+        });
+
+        // Send the messages
+        sendMessages(session, destination, 4);
+
+        assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
+        Thread.sleep(200);
+
+        // Make sure only 4 messages were delivered.
+        assertEquals(4, counter.get());
+    }
+
+    public void initCombosForTestMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() {
+        setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
+        addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE});
+    }
+
+    //
+    // TODO: support setting the prefetch policy on a stompjms connection.
+    // TODO: find out why this test is failing
+    //
+    public void ignoreMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() throws Exception {
+
+        final AtomicInteger counter = new AtomicInteger(0);
+        final CountDownLatch sendDone = new CountDownLatch(1);
+        final CountDownLatch got2Done = new CountDownLatch(1);
+
+        // Set prefetch to 1
+        ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+        // This test case does not work if optimized message dispatch is used as
+        // the main thread send block until the consumer receives the
+        // message. This test depends on thread decoupling so that the main
+        // thread can stop the consumer thread.
+        ((ActiveMQConnection)connection).setOptimizedMessageDispatch(false);
+        connection.start();
+
+        // Use all the ack modes
+        Session session = connection.createSession(false, ackMode);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message m) {
+                try {
+                    TextMessage tm = (TextMessage)m;
+                    LOG.info("Got in first listener: " + tm.getText());
+                    assertEquals("" + counter.get(), tm.getText());
+                    counter.incrementAndGet();
+                    if (counter.get() == 2) {
+                        sendDone.await();
+                        connection.close();
+                        got2Done.countDown();
+                    }
+                    tm.acknowledge();
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        // Send the messages
+        sendMessages(session, destination, 4);
+        sendDone.countDown();
+
+        // Wait for first 2 messages to arrive.
+        assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS));
+
+        // Re-start connection.
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+
+        ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+        connection.start();
+
+        // Pickup the remaining messages.
+        final CountDownLatch done2 = new CountDownLatch(1);
+        session = connection.createSession(false, ackMode);
+        consumer = session.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message m) {
+                try {
+                    TextMessage tm = (TextMessage)m;
+                    LOG.info("Got in second listener: " + tm.getText());
+                    // order is not guaranteed as the connection is started before the listener is set.
+                    // assertEquals("" + counter.get(), tm.getText());
+                    counter.incrementAndGet();
+                    if (counter.get() == 4) {
+                        done2.countDown();
+                    }
+                } catch (Throwable e) {
+                    LOG.error("unexpected ex onMessage: ", e);
+                }
+            }
+        });
+
+        assertTrue(done2.await(1000, TimeUnit.MILLISECONDS));
+        Thread.sleep(200);
+
+        // assert msg 2 was redelivered as close() from onMessages() will only ack in auto_ack and dups_ok mode
+        assertEquals(5, counter.get());
+    }
+
+    public void initCombosForTestMessageListenerAutoAckOnCloseWithPrefetch1() {
+        setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
+        addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE});
+    }
+
+    //
+    // TODO: support setting the prefetch policy on a stompjms connection.
+    // TODO: find out why this test is failing
+    //
+    public void ignoreMessageListenerAutoAckOnCloseWithPrefetch1() throws Exception {
+
+        final AtomicInteger counter = new AtomicInteger(0);
+        final CountDownLatch sendDone = new CountDownLatch(1);
+        final CountDownLatch got2Done = new CountDownLatch(1);
+
+        // Set prefetch to 1
+        ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+        // This test case does not work if optimized message dispatch is used as
+        // the main thread send block until the consumer receives the
+        // message. This test depends on thread decoupling so that the main
+        // thread can stop the consumer thread.
+        ((ActiveMQConnection)connection).setOptimizedMessageDispatch(false);
+        connection.start();
+
+        // Use all the ack modes
+        Session session = connection.createSession(false, ackMode);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message m) {
+                try {
+                    TextMessage tm = (TextMessage)m;
+                    LOG.info("Got in first listener: " + tm.getText());
+                    assertEquals("" + counter.get(), tm.getText());
+                    counter.incrementAndGet();
+                    m.acknowledge();
+                    if (counter.get() == 2) {
+                        sendDone.await();
+                        connection.close();
+                        got2Done.countDown();
+                    }
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        // Send the messages
+        sendMessages(session, destination, 4);
+        sendDone.countDown();
+
+        // Wait for first 2 messages to arrive.
+        assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS));
+
+        // Re-start connection.
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+
+        ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+        connection.start();
+
+        // Pickup the remaining messages.
+        final CountDownLatch done2 = new CountDownLatch(1);
+        session = connection.createSession(false, ackMode);
+        consumer = session.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message m) {
+                try {
+                    TextMessage tm = (TextMessage)m;
+                    LOG.info("Got in second listener: " + tm.getText());
+                    counter.incrementAndGet();
+                    if (counter.get() == 4) {
+                        done2.countDown();
+                    }
+                } catch (Throwable e) {
+                    LOG.error("unexpected ex onMessage: ", e);
+                }
+            }
+        });
+
+        assertTrue(done2.await(1000, TimeUnit.MILLISECONDS));
+        Thread.sleep(200);
+
+        // close from onMessage with Auto_ack will ack
+        // Make sure only 4 messages were delivered.
+        assertEquals(4, counter.get());
+    }
+
+    public void initCombosForTestMessageListenerWithConsumerWithPrefetch1() {
+        setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE,
+                                                              DestinationType.TEMP_QUEUE_TYPE, DestinationType.TEMP_TOPIC_TYPE});
+    }
+
+    //
+    // TODO: support setting the prefetch policy on a stompjms connection.
+    //
+    public void testMessageListenerWithConsumerWithPrefetch1() throws Exception {
+
+        final AtomicInteger counter = new AtomicInteger(0);
+        final CountDownLatch done = new CountDownLatch(1);
+
+        // Receive a message with the JMS API
+        ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message m) {
+                counter.incrementAndGet();
+                if (counter.get() == 4) {
+                    done.countDown();
+                }
+            }
+        });
+
+        // Send the messages
+        sendMessages(session, destination, 4);
+
+        assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
+        Thread.sleep(200);
+
+        // Make sure only 4 messages were delivered.
+        assertEquals(4, counter.get());
+    }
+
+    // TODO: figure out why this is failing with the stompjms connections
+    public void initCombosForTestMessageListenerWithConsumer() {
+        setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE,
+                                                              DestinationType.TEMP_QUEUE_TYPE, DestinationType.TEMP_TOPIC_TYPE});
+    }
+
+    public void testMessageListenerWithConsumer() throws Exception {
+
+        final AtomicInteger counter = new AtomicInteger(0);
+        final CountDownLatch done = new CountDownLatch(1);
+
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message m) {
+                counter.incrementAndGet();
+                if (counter.get() == 4) {
+                    done.countDown();
+                }
+            }
+        });
+
+        // Send the messages
+        sendMessages(session, destination, 4);
+
+        assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
+        Thread.sleep(200);
+
+        // Make sure only 4 messages were delivered.
+        assertEquals(4, counter.get());
+    }
+
+    public void initCombosForTestUnackedWithPrefetch1StayInQueue() {
+        setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE),
+                                                      Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
+        addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE});
+    }
+
+    //
+    // TODO: support setting the prefetch policy on a stompjms connection.
+    //
+    public void testUnackedWithPrefetch1StayInQueue() throws Exception {
+
+        // Set prefetch to 1
+        ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+        connection.start();
+
+        // Use all the ack modes
+        Session session = connection.createSession(false, ackMode);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        sendMessages(session, destination, 4);
+
+        // Only pick up the first 2 messages.
+        Message message = null;
+        for (int i = 0; i < 2; i++) {
+            message = consumer.receive(1000);
+            assertNotNull(message);
+        }
+        message.acknowledge();
+
+        connection.close();
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+        connection.start();
+
+        // Use all the ack modes
+        session = connection.createSession(false, ackMode);
+        consumer = session.createConsumer(destination);
+
+        // Pickup the rest of the messages.
+        for (int i = 0; i < 2; i++) {
+            message = consumer.receive(1000);
+            assertNotNull(message);
+        }
+        message.acknowledge();
+        assertNull(consumer.receiveNoWait());
+
+    }
+
+    public void initCombosForTestPrefetch1MessageNotDispatched() {
+        setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+    }
+
+    //
+    // TODO: support setting the prefetch policy on a stompjms connection.
+    // TODO: find out why this test is failing
+    //
+    public void ignorePrefetch1MessageNotDispatched() throws Exception {
+
+        // Set prefetch to 1
+        ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+        connection.start();
+
+        Session session = connection.createSession(true, 0);
+        destination = new ActiveMQQueue("TEST");
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send 2 messages to the destination.
+        sendMessages(session, destination, 2);
+        session.commit();
+
+        // The prefetch should fill up with 1 message.
+        // Since prefetch is still full, the 2nd message should get dispatched
+        // to another consumer.. lets create the 2nd consumer test that it does
+        // make sure it does.
+        ActiveMQConnection connection2 = (ActiveMQConnection)factory.createConnection();
+        connection2.start();
+        connections.add(connection2);
+        Session session2 = connection2.createSession(true, 0);
+        MessageConsumer consumer2 = session2.createConsumer(destination);
+
+        // Pick up the first message.
+        Message message1 = consumer.receive(1000);
+        assertNotNull(message1);
+
+        // Pick up the 2nd messages.
+        Message message2 = consumer2.receive(5000);
+        assertNotNull(message2);
+
+        session.commit();
+        session2.commit();
+
+        assertNull(consumer.receiveNoWait());
+
+    }
+
+    public void initCombosForTestDontStart() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE});
+    }
+
+    public void testDontStart() throws Exception {
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        sendMessages(session, destination, 1);
+
+        // Make sure no messages were delivered.
+        assertNull(consumer.receive(1000));
+    }
+
+    public void initCombosForTestStartAfterSend() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE});
+    }
+
+    public void testStartAfterSend() throws Exception {
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        sendMessages(session, destination, 1);
+
+        // Start the conncection after the message was sent.
+        connection.start();
+
+        // Make sure only 1 message was delivered.
+        assertNotNull(consumer.receive(1000));
+        assertNull(consumer.receiveNoWait());
+    }
+
+    // TODO: figure out why this is failing with the stompjms connections
+    public void initCombosForTestReceiveMessageWithConsumer() {
+        setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE,
+                                                              DestinationType.TEMP_QUEUE_TYPE, DestinationType.TEMP_TOPIC_TYPE});
+    }
+
+    public void testReceiveMessageWithConsumer() throws Exception {
+
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        sendMessages(session, destination, 1);
+
+        // Make sure only 1 message was delivered.
+        Message m = consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("0", ((TextMessage)m).getText());
+        assertNull(consumer.receiveNoWait());
+    }
+
+    
+    public void testDupsOkConsumer() throws Exception {
+
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+        destination = createDestination(session, DestinationType.QUEUE_TYPE);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        sendMessages(session, destination, 4);
+
+        // Make sure only 4 message are delivered.
+        for( int i=0; i < 4; i++){
+            Message m = consumer.receive(1000);
+            assertNotNull(m);
+        }
+        assertNull(consumer.receive(1000));
+        
+        // Close out the consumer.. no other messages should be left on the queue.
+        consumer.close();
+        
+        consumer = session.createConsumer(destination);
+        assertNull(consumer.receive(1000));
+    }
+
+    // TODO: figure out why this is failing
+    public void ignoreRedispatchOfUncommittedTx() throws Exception {
+
+        connection.start();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        destination = createDestination(session, DestinationType.QUEUE_TYPE);
+        
+        sendMessages(connection, destination, 2);
+        
+        MessageConsumer consumer = session.createConsumer(destination);
+        assertNotNull(consumer.receive(1000));
+        assertNotNull(consumer.receive(1000));
+        
+        // install another consumer while message dispatch is unacked/uncommitted
+        Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer redispatchConsumer = redispatchSession.createConsumer(destination);
+
+        // no commit so will auto rollback and get re-dispatched to redisptachConsumer
+        session.close();
+                
+        Message msg = redispatchConsumer.receive(1000);
+        assertNotNull(msg);
+        assertTrue("redelivered flag set", msg.getJMSRedelivered());
+        assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
+        
+        msg = redispatchConsumer.receive(1000);
+        assertNotNull(msg);
+        assertTrue(msg.getJMSRedelivered());
+        assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
+        redispatchSession.commit();
+        
+        assertNull(redispatchConsumer.receive(500));
+        redispatchSession.close();
+    }
+
+    
+    // TODO: figure out why this is failing
+    public void ignoreRedispatchOfRolledbackTx() throws Exception {
+
+        connection.start();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        destination = createDestination(session, DestinationType.QUEUE_TYPE);
+        
+        sendMessages(connection, destination, 2);
+        
+        MessageConsumer consumer = session.createConsumer(destination);
+        assertNotNull(consumer.receive(1000));
+        assertNotNull(consumer.receive(1000));
+        
+        // install another consumer while message dispatch is unacked/uncommitted
+        Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer redispatchConsumer = redispatchSession.createConsumer(destination);
+
+        session.rollback();
+        session.close();
+                
+        Message msg = redispatchConsumer.receive(1000);
+        assertNotNull(msg);
+        assertTrue(msg.getJMSRedelivered());
+        assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
+        msg = redispatchConsumer.receive(1000);
+        assertNotNull(msg);
+        assertTrue(msg.getJMSRedelivered());
+        assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
+        redispatchSession.commit();
+        
+        assertNull(redispatchConsumer.receive(500));
+        redispatchSession.close();
+    }
+    
+    
+    public void initCombosForTestAckOfExpired() {
+        setCombinationValues("protocol",  new Object[] {new OpenwireBrokerProtocol()});
+        addCombinationValues("destinationType",
+                new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE});
+    }
+        
+    //
+    // TODO: support setting the prefetch policy on a stompjms connection.
+    //
+    // TODO: figure out why this is failing
+    public void ignoreAckOfExpired() throws Exception {
+        
+//        ActiveMQConnectionFactory fact = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=4&jms.sendAcksAsync=false");
+//        connection = fact.createActiveMQConnection();
+        
+        ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(4);
+        ((ActiveMQConnection)connection).setSendAcksAsync(false);
+
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
+        destination = createDestination(session, destinationType);
+                    
+        MessageConsumer consumer = session.createConsumer(destination);
+        // connection.setStatsEnabled(true);
+                
+        Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
+            MessageProducer producer = sendSession.createProducer(destination);
+        producer.setTimeToLive(1000);
+        final int count = 4;
+        for (int i = 0; i < count; i++) {
+            TextMessage message = sendSession.createTextMessage("" + i);
+            producer.send(message);
+        }
+        
+        // let first bunch in queue expire
+        Thread.sleep(2000);
+        
+        producer.setTimeToLive(0);
+        for (int i = 0; i < count; i++) {
+            TextMessage message = sendSession.createTextMessage("no expiry" + i);
+            producer.send(message);
+        }
+        
+        ActiveMQMessageConsumer amqConsumer = (ActiveMQMessageConsumer) consumer;
+        
+        for(int i=0; i<count; i++) {
+            TextMessage msg = (TextMessage) amqConsumer.receive();
+            assertNotNull(msg);
+            assertTrue("message has \"no expiry\" text: " + msg.getText(), msg.getText().contains("no expiry"));
+            
+            // force an ack when there are expired messages
+            amqConsumer.acknowledge();         
+        }
+        assertEquals("consumer has expiredMessages", count, amqConsumer.getConsumerStats().getExpiredMessageCount().getCount());
+
+        long t = protocol.getInflightCount(broker, destination);
+        assertEquals("Wrong inFlightCount: " + t, 0, t);
+        t = protocol.getDequeueCount(broker, destination);
+        assertEquals("Wrong dequeue count: " + t, 8, t);
+        //assertEquals("Wrong dispatch count: " + view.getDispatchCount(), 8, view.getDispatchCount());
+    }
+}

Propchange: activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JMSConsumerTest.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JmsTestBase.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JmsTestBase.java?rev=1365040&r1=1365039&r2=1365040&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JmsTestBase.java (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JmsTestBase.java Tue Jul 24 13:30:11 2012
@@ -169,13 +169,6 @@ public class JmsTestBase extends Combina
     public String messageTextPrefix = "";
 
 
-    enum DestinationType {
-        QUEUE_TYPE,
-        TOPIC_TYPE,
-        TEMP_QUEUE_TYPE,
-        TEMP_TOPIC_TYPE
-    }
-
     // /////////////////////////////////////////////////////////////////
     //
     // Test support methods.

Modified: activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/OpenwireBrokerProtocol.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/OpenwireBrokerProtocol.java?rev=1365040&r1=1365039&r2=1365040&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/OpenwireBrokerProtocol.java (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/OpenwireBrokerProtocol.java Tue Jul 24 13:30:11 2012
@@ -17,10 +17,12 @@
 package org.apache.activemq.apollo;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
 
 import static java.lang.String.*;
 
 import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
 
 /**
  * <p>
@@ -42,4 +44,8 @@ public class OpenwireBrokerProtocol exte
         return "OpenWire";
     }
 
+    @Override
+    protected String name(Destination destination) {
+        return ((ActiveMQDestination)destination).getPhysicalName();
+    }
 }

Modified: activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/StompBrokerProtocol.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/StompBrokerProtocol.java?rev=1365040&r1=1365039&r2=1365040&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/StompBrokerProtocol.java (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/StompBrokerProtocol.java Tue Jul 24 13:30:11 2012
@@ -17,8 +17,10 @@
 package org.apache.activemq.apollo;
 
 import org.fusesource.stomp.jms.StompJmsConnectionFactory;
+import org.fusesource.stomp.jms.StompJmsDestination;
 
 import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
 
 import static java.lang.String.format;
 
@@ -41,4 +43,9 @@ public class StompBrokerProtocol extends
     public String toString() {
         return "STOMP";
     }
+
+    @Override
+    protected String name(Destination destination) {
+        return ((StompJmsDestination)destination).getPhysicalName();
+    }
 }