You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/07/24 15:30:12 UTC
svn commit: r1365040 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/test/scala/
apollo-itests/src/test/java/org/apache/activemq/apollo/
Author: chirino
Date: Tue Jul 24 13:30:11 2012
New Revision: 1365040
URL: http://svn.apache.org/viewvc?rev=1365040&view=rev
Log:
Ported the JMSConsumerTest from ActiveMQ 5.x
Added:
activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/DestinationType.java
activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JMSConsumerTest.java (with props)
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/BrokerProtocol.java
activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JmsTestBase.java
activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/OpenwireBrokerProtocol.java
activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/StompBrokerProtocol.java
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala?rev=1365040&r1=1365039&r2=1365040&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala Tue Jul 24 13:30:11 2012
@@ -23,47 +23,13 @@ import FileSupport._
import org.fusesource.hawtdispatch._
import org.apache.activemq.apollo.dto.{AggregateDestMetricsDTO, QueueStatusDTO, TopicStatusDTO}
-/**
- * <p>
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class BrokerFunSuiteSupport extends FunSuiteSupport with Logging { // with ShouldMatchers with BeforeAndAfterEach with Logging {
- var broker: Broker = null
- var port = 0
-
- def broker_config_uri = "xml:classpath:apollo.xml"
-
- def createBroker: Broker = {
- info("Loading broker configuration from the classpath with URI: " + broker_config_uri)
- var broker = BrokerFactory.createBroker(broker_config_uri)
- broker.setTmp(basedir / "target" / "tmp")
- broker.getTmp().mkdirs()
- broker
- }
-
- override def beforeAll() = {
- super.beforeAll()
- try {
- broker = createBroker
- ServiceControl.start(broker)
- port = broker.get_socket_address.asInstanceOf[InetSocketAddress].getPort
- } catch {
- case e: Throwable => e.printStackTrace
- }
- }
+object BrokerTestSupport {
- override def afterAll() = {
- ServiceControl.stop(broker)
- super.afterAll()
- }
-
- def connector_port(connector: String): Option[Int] = Option(connector).map {
- id => broker.connectors.get(id).map(_.socket_address.asInstanceOf[InetSocketAddress].getPort).getOrElse(port)
+ def connector_port(broker:Broker, connector: String): Option[Int] = Option(connector).map {
+ id => broker.connectors.get(id).map(_.socket_address.asInstanceOf[InetSocketAddress].getPort).getOrElse(0)
}
- def queue_exists(name: String): Boolean = {
+ def queue_exists(broker:Broker, name: String): Boolean = {
val host = broker.default_virtual_host
host.dispatch_queue.future {
val router = host.router.asInstanceOf[LocalRouter]
@@ -71,7 +37,7 @@ class BrokerFunSuiteSupport extends FunS
}.await()
}
- def delete_queue(name: String) = {
+ def delete_queue(broker:Broker, name: String) = {
val host = broker.default_virtual_host
host.dispatch_queue.future {
val router = host.router.asInstanceOf[LocalRouter]
@@ -81,7 +47,7 @@ class BrokerFunSuiteSupport extends FunS
}.await()
}
- def topic_exists(name: String): Boolean = {
+ def topic_exists(broker:Broker, name: String): Boolean = {
val host = broker.default_virtual_host
host.dispatch_queue.future {
val router = host.router.asInstanceOf[LocalRouter]
@@ -89,7 +55,7 @@ class BrokerFunSuiteSupport extends FunS
}.await()
}
- def topic_status(name: String): TopicStatusDTO = {
+ def topic_status(broker:Broker, name: String): TopicStatusDTO = {
val host = broker.default_virtual_host
sync(host) {
val router = host.router.asInstanceOf[LocalRouter]
@@ -97,28 +63,28 @@ class BrokerFunSuiteSupport extends FunS
}
}
- def get_queue_metrics: AggregateDestMetricsDTO = {
+ def get_queue_metrics(broker:Broker): AggregateDestMetricsDTO = {
val host = broker.default_virtual_host
sync(host) {
host.get_queue_metrics
}
}
- def get_topic_metrics: AggregateDestMetricsDTO = {
+ def get_topic_metrics(broker:Broker): AggregateDestMetricsDTO = {
val host = broker.default_virtual_host
sync(host) {
host.get_topic_metrics
}
}
- def get_dsub_metrics: AggregateDestMetricsDTO = {
+ def get_dsub_metrics(broker:Broker): AggregateDestMetricsDTO = {
val host = broker.default_virtual_host
sync(host) {
host.get_dsub_metrics
}
}
- def queue_status(name: String): QueueStatusDTO = {
+ def queue_status(broker:Broker, name: String): QueueStatusDTO = {
val host = broker.default_virtual_host
sync(host) {
val router = host.router.asInstanceOf[LocalRouter]
@@ -129,7 +95,7 @@ class BrokerFunSuiteSupport extends FunS
}
}
- def dsub_status(name: String): QueueStatusDTO = {
+ def dsub_status(broker:Broker, name: String): QueueStatusDTO = {
val host = broker.default_virtual_host
sync(host) {
val router = host.router.asInstanceOf[LocalRouter]
@@ -137,9 +103,57 @@ class BrokerFunSuiteSupport extends FunS
}
}
- def webadmin_uri(scheme:String = "http") = {
+ def webadmin_uri(broker:Broker, scheme:String) = {
Option(broker.web_server).flatMap(_.uris().find(_.getScheme == scheme)).get
}
+}
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class BrokerFunSuiteSupport extends FunSuiteSupport with Logging { // with ShouldMatchers with BeforeAndAfterEach with Logging {
+ var broker: Broker = null
+ var port = 0
+
+ def broker_config_uri = "xml:classpath:apollo.xml"
+
+ def createBroker: Broker = {
+ info("Loading broker configuration from the classpath with URI: " + broker_config_uri)
+ var broker = BrokerFactory.createBroker(broker_config_uri)
+ broker.setTmp(basedir / "target" / "tmp")
+ broker.getTmp().mkdirs()
+ broker
+ }
+
+ override def beforeAll() = {
+ super.beforeAll()
+ try {
+ broker = createBroker
+ ServiceControl.start(broker)
+ port = broker.get_socket_address.asInstanceOf[InetSocketAddress].getPort
+ } catch {
+ case e: Throwable => e.printStackTrace
+ }
+ }
+
+ override def afterAll() = {
+ ServiceControl.stop(broker)
+ super.afterAll()
+ }
+
+ def connector_port(connector: String) = BrokerTestSupport.connector_port(broker, connector)
+ def queue_exists(name: String) = BrokerTestSupport.queue_exists(broker, name)
+ def delete_queue(name: String) = BrokerTestSupport.delete_queue(broker, name)
+ def topic_exists(name: String) = BrokerTestSupport.topic_exists(broker, name)
+ def topic_status(name: String) = BrokerTestSupport.topic_status(broker, name)
+ def get_queue_metrics = BrokerTestSupport.get_queue_metrics(broker)
+ def get_topic_metrics = BrokerTestSupport.get_topic_metrics(broker)
+ def get_dsub_metrics = BrokerTestSupport.get_dsub_metrics(broker)
+ def queue_status(name: String) = BrokerTestSupport.queue_status(broker, name)
+ def dsub_status(name: String) = BrokerTestSupport.dsub_status(broker, name)
+ def webadmin_uri(scheme:String = "http") = BrokerTestSupport.webadmin_uri(broker, scheme)
def json(value:Any) = org.apache.activemq.apollo.dto.JsonCodec.encode(value).ascii().toString;
Modified: activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/BrokerProtocol.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/BrokerProtocol.java?rev=1365040&r1=1365039&r2=1365040&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/BrokerProtocol.java (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/BrokerProtocol.java Tue Jul 24 13:30:11 2012
@@ -18,11 +18,17 @@ package org.apache.activemq.apollo;
import org.apache.activemq.apollo.broker.Broker;
import org.apache.activemq.apollo.broker.BrokerFactory;
+import org.apache.activemq.apollo.broker.BrokerFunSuiteSupport;
+import org.apache.activemq.apollo.broker.BrokerTestSupport;
+import org.apache.activemq.apollo.dto.DestMetricsDTO;
+import org.apache.activemq.apollo.dto.QueueStatusDTO;
+import org.apache.activemq.apollo.dto.TopicStatusDTO;
import org.apache.activemq.apollo.util.ServiceControl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
import java.net.InetSocketAddress;
/**
@@ -51,6 +57,55 @@ abstract public class BrokerProtocol {
return address.getPort();
}
+ private DestMetricsDTO getMetrics(Broker broker, Destination destination) {
+ DestMetricsDTO metrics = null;
+ switch (DestinationType.of(destination)) {
+ case QUEUE_TYPE:
+ case TEMP_QUEUE_TYPE:{
+ QueueStatusDTO dto = BrokerTestSupport.queue_status((Broker) broker, name(destination));
+ if( dto != null ) {
+ metrics = dto.metrics;
+ }
+ }
+ case TOPIC_TYPE: {
+ final TopicStatusDTO dto = BrokerTestSupport.topic_status((Broker) broker, name(destination));
+ if( dto != null ) {
+ metrics = dto.metrics;
+ }
+ }
+ case TEMP_TOPIC_TYPE:
+ }
+ return metrics;
+ }
+
+ public long getInflightCount(Object broker, Destination destination) {
+ DestMetricsDTO metrics = getMetrics((Broker) broker, destination);
+ if( metrics==null ) {
+ return 0;
+ }
+ return metrics.queue_size;
+ }
+
+ public long getDequeueCount(Object broker, Destination destination) {
+ DestMetricsDTO metrics = getMetrics((Broker) broker, destination);
+ if( metrics==null ) {
+ return 0;
+ }
+ return metrics.dequeue_item_counter;
+ }
+
+// protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
+// String domain = "org.apache.activemq";
+// ObjectName name;
+// if (destination.isQueue()) {
+// name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=test");
+// } else {
+// name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=test");
+// }
+// return (DestinationViewMBean)broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true);
+// }
+
abstract ConnectionFactory getConnectionFactory(Object broker);
+ protected abstract String name(Destination destination);
}
Added: activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/DestinationType.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/DestinationType.java?rev=1365040&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/DestinationType.java (added)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/DestinationType.java Tue Jul 24 13:30:11 2012
@@ -0,0 +1,33 @@
+package org.apache.activemq.apollo;
+
+import javax.jms.Destination;
+import javax.jms.*;
+
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public enum DestinationType {
+ QUEUE_TYPE,
+ TOPIC_TYPE,
+ TEMP_QUEUE_TYPE,
+ TEMP_TOPIC_TYPE;
+
+ public static DestinationType of(Destination d) {
+ if( d instanceof Queue) {
+ if( d instanceof TemporaryQueue ) {
+ return TEMP_QUEUE_TYPE;
+ } else {
+ return QUEUE_TYPE;
+ }
+ }
+ if( d instanceof Topic) {
+ if( d instanceof TemporaryTopic ) {
+ return TEMP_TOPIC_TYPE;
+ } else {
+ return TOPIC_TYPE;
+ }
+ }
+ return null;
+ }
+}
Added: activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JMSConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JMSConsumerTest.java?rev=1365040&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JMSConsumerTest.java (added)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JMSConsumerTest.java Tue Jul 24 13:30:11 2012
@@ -0,0 +1,967 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo;
+
+import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageConsumer;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import javax.management.ObjectName;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Test cases used to test the JMS message consumer.
+ *
+ * This test case currently has lots of tests commented out.. Look for the
+ * 'TODO's. https://issues.apache.org/jira/browse/APLO-227 is tracking these
+ * issues.
+ */
+public class JMSConsumerTest extends JmsTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JMSConsumerTest.class);
+
+ public static Test suite() {
+ return suite(JMSConsumerTest.class);
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
+
+ public Destination destination;
+ public int deliveryMode;
+ public int prefetch;
+ public int ackMode;
+ public DestinationType destinationType;
+ public boolean durableConsumer;
+
+ // TODO: figure out why this test does not work /w stompjms
+ public void initCombosForTestMessageListenerWithConsumerCanBeStopped() {
+ setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE,
+ DestinationType.TEMP_QUEUE_TYPE, DestinationType.TEMP_TOPIC_TYPE});
+ }
+
+ public void testMessageListenerWithConsumerCanBeStopped() throws Exception {
+
+ final AtomicInteger counter = new AtomicInteger(0);
+ final CountDownLatch done1 = new CountDownLatch(1);
+ final CountDownLatch done2 = new CountDownLatch(1);
+
+ // Receive a message with the JMS API
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)session.createConsumer(destination);
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message m) {
+ counter.incrementAndGet();
+ if (counter.get() == 1) {
+ done1.countDown();
+ }
+ if (counter.get() == 2) {
+ done2.countDown();
+ }
+ }
+ });
+
+ // Send a first message to make sure that the consumer dispatcher is
+ // running
+ sendMessages(session, destination, 1);
+ assertTrue(done1.await(1, TimeUnit.SECONDS));
+ assertEquals(1, counter.get());
+
+ // Stop the consumer.
+ consumer.stop();
+
+ // Send a message, but should not get delivered.
+ sendMessages(session, destination, 1);
+ assertFalse(done2.await(1, TimeUnit.SECONDS));
+ assertEquals(1, counter.get());
+
+ // Start the consumer, and the message should now get delivered.
+ consumer.start();
+ assertTrue(done2.await(1, TimeUnit.SECONDS));
+ assertEquals(2, counter.get());
+ }
+
+ // TODO: figure out why this test causes a OOM /w the stompjms client.
+ public void initCombosForTestMessageListenerWithConsumerCanBeStoppedConcurently() {
+ setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
+ }
+ public void testMessageListenerWithConsumerCanBeStoppedConcurently() throws Exception {
+
+ final AtomicInteger counter = new AtomicInteger(0);
+ final CountDownLatch closeDone = new CountDownLatch(1);
+
+ connection.start();
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ destination = createDestination(session, DestinationType.QUEUE_TYPE);
+
+ // preload the queue
+ sendMessages(session, destination, 2000);
+
+
+ final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)session.createConsumer(destination);
+
+ final Map<Thread, Throwable> exceptions =
+ Collections.synchronizedMap(new HashMap<Thread, Throwable>());
+ Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+ public void uncaughtException(Thread t, Throwable e) {
+ LOG.error("Uncaught exception:", e);
+ exceptions.put(t, e);
+ }
+ });
+
+ final class AckAndClose implements Runnable {
+ private Message message;
+
+ public AckAndClose(Message m) {
+ this.message = m;
+ }
+
+ public void run() {
+ try {
+ int count = counter.incrementAndGet();
+ if (count == 590) {
+ // close in a separate thread is ok by jms
+ consumer.close();
+ closeDone.countDown();
+ }
+ if (count % 200 == 0) {
+ // ensure there are some outstanding messages
+ // ack every 200
+ message.acknowledge();
+ }
+ } catch (Exception e) {
+ LOG.error("Exception on close or ack:", e);
+ exceptions.put(Thread.currentThread(), e);
+ }
+ }
+ };
+
+ final ExecutorService executor = Executors.newCachedThreadPool();
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message m) {
+ // ack and close eventually in separate thread
+ executor.execute(new AckAndClose(m));
+ }
+ });
+
+ assertTrue(closeDone.await(20, TimeUnit.SECONDS));
+ // await possible exceptions
+ Thread.sleep(1000);
+ assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+ }
+
+
+ public void initCombosForTestMutiReceiveWithPrefetch1() {
+ setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE),
+ Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
+ addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE,
+ DestinationType.TEMP_QUEUE_TYPE, DestinationType.TEMP_TOPIC_TYPE});
+ }
+ //
+ // TODO: support setting the prefetch policy on a stompjms connection.
+ // TODO: test is failing.
+ //
+ public void ignoreMutiReceiveWithPrefetch1() throws Exception {
+
+ // Set prefetch to 1
+ ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+ connection.start();
+
+ // Use all the ack modes
+ Session session = connection.createSession(false, ackMode);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // Send the messages
+ sendMessages(session, destination, 4);
+
+ // Make sure 4 messages were delivered.
+ Message message = null;
+ for (int i = 0; i < 4; i++) {
+ message = consumer.receive(1000);
+ assertNotNull(message);
+ }
+ assertNull(consumer.receiveNoWait());
+ message.acknowledge();
+ }
+
+ //
+ // TODO: find out why this test is failing on a stompjms connection.
+ //
+ public void initCombosForTestDurableConsumerSelectorChange() {
+ setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType", new Object[] {DestinationType.TOPIC_TYPE});
+ }
+
+ public void testDurableConsumerSelectorChange() throws Exception {
+
+ // Receive a message with the JMS API
+ connection.setClientID("test");
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ MessageProducer producer = session.createProducer(destination);
+ producer.setDeliveryMode(deliveryMode);
+ MessageConsumer consumer = session.createDurableSubscriber((Topic)destination, "test", "color='red'", false);
+
+ // Send the messages
+ TextMessage message = session.createTextMessage("1st");
+ message.setStringProperty("color", "red");
+ producer.send(message);
+
+ Message m = consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals("1st", ((TextMessage)m).getText());
+
+ // Change the subscription.
+ consumer.close();
+ consumer = session.createDurableSubscriber((Topic)destination, "test", "color='blue'", false);
+
+ message = session.createTextMessage("2nd");
+ message.setStringProperty("color", "red");
+ producer.send(message);
+ message = session.createTextMessage("3rd");
+ message.setStringProperty("color", "blue");
+ producer.send(message);
+
+ // Selector should skip the 2nd message.
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals("3rd", ((TextMessage)m).getText());
+
+ assertNull(consumer.receiveNoWait());
+ }
+
+ //
+ // TODO: find out why this test is failing on a stompjms connection.
+ //
+ public void initCombosForTestSendReceiveBytesMessage() {
+ setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE,
+ DestinationType.TEMP_QUEUE_TYPE, DestinationType.TEMP_TOPIC_TYPE});
+ }
+
+ public void testSendReceiveBytesMessage() throws Exception {
+
+ // Receive a message with the JMS API
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+ MessageProducer producer = session.createProducer(destination);
+
+ BytesMessage message = session.createBytesMessage();
+ message.writeBoolean(true);
+ message.writeBoolean(false);
+ producer.send(message);
+
+ // Make sure only 1 message was delivered.
+ BytesMessage m = (BytesMessage)consumer.receive(1000);
+ assertNotNull(m);
+ assertTrue(m.readBoolean());
+ assertFalse(m.readBoolean());
+
+ assertNull(consumer.receiveNoWait());
+ }
+
+ //
+ // TODO: find out why this test is failing on a stompjms connection.
+ //
+ public void initCombosForTestSetMessageListenerAfterStart() {
+ setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE,
+ DestinationType.TEMP_QUEUE_TYPE, DestinationType.TEMP_TOPIC_TYPE});
+ }
+
+ public void testSetMessageListenerAfterStart() throws Exception {
+
+ final AtomicInteger counter = new AtomicInteger(0);
+ final CountDownLatch done = new CountDownLatch(1);
+
+ // Receive a message with the JMS API
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // Send the messages
+ sendMessages(session, destination, 4);
+
+ // See if the message get sent to the listener
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message m) {
+ counter.incrementAndGet();
+ if (counter.get() == 4) {
+ done.countDown();
+ }
+ }
+ });
+
+ assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
+ Thread.sleep(200);
+
+ // Make sure only 4 messages were delivered.
+ assertEquals(4, counter.get());
+ }
+
+ //
+ // TODO: find out why this test is failing on a stompjms connection.
+ //
+ public void initCombosForTestPassMessageListenerIntoCreateConsumer() {
+ setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
+ addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE});
+ }
+
+ public void testPassMessageListenerIntoCreateConsumer() throws Exception {
+
+ final AtomicInteger counter = new AtomicInteger(0);
+ final CountDownLatch done = new CountDownLatch(1);
+
+ // Receive a message with the JMS API
+ connection.start();
+ ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination, new MessageListener() {
+ public void onMessage(Message m) {
+ counter.incrementAndGet();
+ if (counter.get() == 4) {
+ done.countDown();
+ }
+ }
+ });
+
+ // Send the messages
+ sendMessages(session, destination, 4);
+
+ assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
+ Thread.sleep(200);
+
+ // Make sure only 4 messages were delivered.
+ assertEquals(4, counter.get());
+ }
+
+ public void initCombosForTestMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() {
+ setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
+ addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE});
+ }
+
+ //
+ // TODO: support setting the prefetch policy on a stompjms connection.
+ // TODO: find out why this test is failing
+ //
+ public void ignoreMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() throws Exception {
+
+ final AtomicInteger counter = new AtomicInteger(0);
+ final CountDownLatch sendDone = new CountDownLatch(1);
+ final CountDownLatch got2Done = new CountDownLatch(1);
+
+ // Set prefetch to 1
+ ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+ // This test case does not work if optimized message dispatch is used as
+ // the main thread send block until the consumer receives the
+ // message. This test depends on thread decoupling so that the main
+ // thread can stop the consumer thread.
+ ((ActiveMQConnection)connection).setOptimizedMessageDispatch(false);
+ connection.start();
+
+ // Use all the ack modes
+ Session session = connection.createSession(false, ackMode);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message m) {
+ try {
+ TextMessage tm = (TextMessage)m;
+ LOG.info("Got in first listener: " + tm.getText());
+ assertEquals("" + counter.get(), tm.getText());
+ counter.incrementAndGet();
+ if (counter.get() == 2) {
+ sendDone.await();
+ connection.close();
+ got2Done.countDown();
+ }
+ tm.acknowledge();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ // Send the messages
+ sendMessages(session, destination, 4);
+ sendDone.countDown();
+
+ // Wait for first 2 messages to arrive.
+ assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS));
+
+ // Re-start connection.
+ connection = (ActiveMQConnection)factory.createConnection();
+ connections.add(connection);
+
+ ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+ connection.start();
+
+ // Pickup the remaining messages.
+ final CountDownLatch done2 = new CountDownLatch(1);
+ session = connection.createSession(false, ackMode);
+ consumer = session.createConsumer(destination);
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message m) {
+ try {
+ TextMessage tm = (TextMessage)m;
+ LOG.info("Got in second listener: " + tm.getText());
+ // order is not guaranteed as the connection is started before the listener is set.
+ // assertEquals("" + counter.get(), tm.getText());
+ counter.incrementAndGet();
+ if (counter.get() == 4) {
+ done2.countDown();
+ }
+ } catch (Throwable e) {
+ LOG.error("unexpected ex onMessage: ", e);
+ }
+ }
+ });
+
+ assertTrue(done2.await(1000, TimeUnit.MILLISECONDS));
+ Thread.sleep(200);
+
+ // assert msg 2 was redelivered as close() from onMessages() will only ack in auto_ack and dups_ok mode
+ assertEquals(5, counter.get());
+ }
+
+ public void initCombosForTestMessageListenerAutoAckOnCloseWithPrefetch1() {
+ setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
+ addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE});
+ }
+
+ //
+ // TODO: support setting the prefetch policy on a stompjms connection.
+ // TODO: find out why this test is failing
+ //
+ public void ignoreMessageListenerAutoAckOnCloseWithPrefetch1() throws Exception {
+
+ final AtomicInteger counter = new AtomicInteger(0);
+ final CountDownLatch sendDone = new CountDownLatch(1);
+ final CountDownLatch got2Done = new CountDownLatch(1);
+
+ // Set prefetch to 1
+ ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+ // This test case does not work if optimized message dispatch is used as
+ // the main thread send block until the consumer receives the
+ // message. This test depends on thread decoupling so that the main
+ // thread can stop the consumer thread.
+ ((ActiveMQConnection)connection).setOptimizedMessageDispatch(false);
+ connection.start();
+
+ // Use all the ack modes
+ Session session = connection.createSession(false, ackMode);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message m) {
+ try {
+ TextMessage tm = (TextMessage)m;
+ LOG.info("Got in first listener: " + tm.getText());
+ assertEquals("" + counter.get(), tm.getText());
+ counter.incrementAndGet();
+ m.acknowledge();
+ if (counter.get() == 2) {
+ sendDone.await();
+ connection.close();
+ got2Done.countDown();
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ // Send the messages
+ sendMessages(session, destination, 4);
+ sendDone.countDown();
+
+ // Wait for first 2 messages to arrive.
+ assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS));
+
+ // Re-start connection.
+ connection = (ActiveMQConnection)factory.createConnection();
+ connections.add(connection);
+
+ ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+ connection.start();
+
+ // Pickup the remaining messages.
+ final CountDownLatch done2 = new CountDownLatch(1);
+ session = connection.createSession(false, ackMode);
+ consumer = session.createConsumer(destination);
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message m) {
+ try {
+ TextMessage tm = (TextMessage)m;
+ LOG.info("Got in second listener: " + tm.getText());
+ counter.incrementAndGet();
+ if (counter.get() == 4) {
+ done2.countDown();
+ }
+ } catch (Throwable e) {
+ LOG.error("unexpected ex onMessage: ", e);
+ }
+ }
+ });
+
+ assertTrue(done2.await(1000, TimeUnit.MILLISECONDS));
+ Thread.sleep(200);
+
+ // close from onMessage with Auto_ack will ack
+ // Make sure only 4 messages were delivered.
+ assertEquals(4, counter.get());
+ }
+
+ public void initCombosForTestMessageListenerWithConsumerWithPrefetch1() {
+ setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE,
+ DestinationType.TEMP_QUEUE_TYPE, DestinationType.TEMP_TOPIC_TYPE});
+ }
+
+ //
+ // TODO: support setting the prefetch policy on a stompjms connection.
+ //
+ public void testMessageListenerWithConsumerWithPrefetch1() throws Exception {
+
+ final AtomicInteger counter = new AtomicInteger(0);
+ final CountDownLatch done = new CountDownLatch(1);
+
+ // Receive a message with the JMS API
+ ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message m) {
+ counter.incrementAndGet();
+ if (counter.get() == 4) {
+ done.countDown();
+ }
+ }
+ });
+
+ // Send the messages
+ sendMessages(session, destination, 4);
+
+ assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
+ Thread.sleep(200);
+
+ // Make sure only 4 messages were delivered.
+ assertEquals(4, counter.get());
+ }
+
+ // TODO: figure out why this is failing with the stompjms connections
+ public void initCombosForTestMessageListenerWithConsumer() {
+ setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE,
+ DestinationType.TEMP_QUEUE_TYPE, DestinationType.TEMP_TOPIC_TYPE});
+ }
+
+ public void testMessageListenerWithConsumer() throws Exception {
+
+ final AtomicInteger counter = new AtomicInteger(0);
+ final CountDownLatch done = new CountDownLatch(1);
+
+ // Receive a message with the JMS API
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message m) {
+ counter.incrementAndGet();
+ if (counter.get() == 4) {
+ done.countDown();
+ }
+ }
+ });
+
+ // Send the messages
+ sendMessages(session, destination, 4);
+
+ assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
+ Thread.sleep(200);
+
+ // Make sure only 4 messages were delivered.
+ assertEquals(4, counter.get());
+ }
+
+ public void initCombosForTestUnackedWithPrefetch1StayInQueue() {
+ setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE),
+ Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
+ addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE});
+ }
+
+ //
+ // TODO: support setting the prefetch policy on a stompjms connection.
+ //
+ public void testUnackedWithPrefetch1StayInQueue() throws Exception {
+
+ // Set prefetch to 1
+ ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+ connection.start();
+
+ // Use all the ack modes
+ Session session = connection.createSession(false, ackMode);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // Send the messages
+ sendMessages(session, destination, 4);
+
+ // Only pick up the first 2 messages.
+ Message message = null;
+ for (int i = 0; i < 2; i++) {
+ message = consumer.receive(1000);
+ assertNotNull(message);
+ }
+ message.acknowledge();
+
+ connection.close();
+ connection = (ActiveMQConnection)factory.createConnection();
+ connections.add(connection);
+ ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+ connection.start();
+
+ // Use all the ack modes
+ session = connection.createSession(false, ackMode);
+ consumer = session.createConsumer(destination);
+
+ // Pickup the rest of the messages.
+ for (int i = 0; i < 2; i++) {
+ message = consumer.receive(1000);
+ assertNotNull(message);
+ }
+ message.acknowledge();
+ assertNull(consumer.receiveNoWait());
+
+ }
+
+ public void initCombosForTestPrefetch1MessageNotDispatched() {
+ setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+ }
+
+ //
+ // TODO: support setting the prefetch policy on a stompjms connection.
+ // TODO: find out why this test is failing
+ //
+ public void ignorePrefetch1MessageNotDispatched() throws Exception {
+
+ // Set prefetch to 1
+ ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1);
+ connection.start();
+
+ Session session = connection.createSession(true, 0);
+ destination = new ActiveMQQueue("TEST");
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // Send 2 messages to the destination.
+ sendMessages(session, destination, 2);
+ session.commit();
+
+ // The prefetch should fill up with 1 message.
+ // Since prefetch is still full, the 2nd message should get dispatched
+ // to another consumer.. lets create the 2nd consumer test that it does
+ // make sure it does.
+ ActiveMQConnection connection2 = (ActiveMQConnection)factory.createConnection();
+ connection2.start();
+ connections.add(connection2);
+ Session session2 = connection2.createSession(true, 0);
+ MessageConsumer consumer2 = session2.createConsumer(destination);
+
+ // Pick up the first message.
+ Message message1 = consumer.receive(1000);
+ assertNotNull(message1);
+
+ // Pick up the 2nd messages.
+ Message message2 = consumer2.receive(5000);
+ assertNotNull(message2);
+
+ session.commit();
+ session2.commit();
+
+ assertNull(consumer.receiveNoWait());
+
+ }
+
+ public void initCombosForTestDontStart() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT)});
+ addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE});
+ }
+
+ public void testDontStart() throws Exception {
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // Send the messages
+ sendMessages(session, destination, 1);
+
+ // Make sure no messages were delivered.
+ assertNull(consumer.receive(1000));
+ }
+
+ public void initCombosForTestStartAfterSend() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT)});
+ addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE});
+ }
+
+ public void testStartAfterSend() throws Exception {
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // Send the messages
+ sendMessages(session, destination, 1);
+
+ // Start the conncection after the message was sent.
+ connection.start();
+
+ // Make sure only 1 message was delivered.
+ assertNotNull(consumer.receive(1000));
+ assertNull(consumer.receiveNoWait());
+ }
+
+ // TODO: figure out why this is failing with the stompjms connections
+ public void initCombosForTestReceiveMessageWithConsumer() {
+ setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType", new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE,
+ DestinationType.TEMP_QUEUE_TYPE, DestinationType.TEMP_TOPIC_TYPE});
+ }
+
+ public void testReceiveMessageWithConsumer() throws Exception {
+
+ // Receive a message with the JMS API
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // Send the messages
+ sendMessages(session, destination, 1);
+
+ // Make sure only 1 message was delivered.
+ Message m = consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals("0", ((TextMessage)m).getText());
+ assertNull(consumer.receiveNoWait());
+ }
+
+
+ public void testDupsOkConsumer() throws Exception {
+
+ // Receive a message with the JMS API
+ connection.start();
+ Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+ destination = createDestination(session, DestinationType.QUEUE_TYPE);
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // Send the messages
+ sendMessages(session, destination, 4);
+
+ // Make sure only 4 message are delivered.
+ for( int i=0; i < 4; i++){
+ Message m = consumer.receive(1000);
+ assertNotNull(m);
+ }
+ assertNull(consumer.receive(1000));
+
+ // Close out the consumer.. no other messages should be left on the queue.
+ consumer.close();
+
+ consumer = session.createConsumer(destination);
+ assertNull(consumer.receive(1000));
+ }
+
+ // TODO: figure out why this is failing
+ public void ignoreRedispatchOfUncommittedTx() throws Exception {
+
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ destination = createDestination(session, DestinationType.QUEUE_TYPE);
+
+ sendMessages(connection, destination, 2);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+ assertNotNull(consumer.receive(1000));
+ assertNotNull(consumer.receive(1000));
+
+ // install another consumer while message dispatch is unacked/uncommitted
+ Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer redispatchConsumer = redispatchSession.createConsumer(destination);
+
+ // no commit so will auto rollback and get re-dispatched to redisptachConsumer
+ session.close();
+
+ Message msg = redispatchConsumer.receive(1000);
+ assertNotNull(msg);
+ assertTrue("redelivered flag set", msg.getJMSRedelivered());
+ assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
+
+ msg = redispatchConsumer.receive(1000);
+ assertNotNull(msg);
+ assertTrue(msg.getJMSRedelivered());
+ assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
+ redispatchSession.commit();
+
+ assertNull(redispatchConsumer.receive(500));
+ redispatchSession.close();
+ }
+
+
+ // TODO: figure out why this is failing
+ public void ignoreRedispatchOfRolledbackTx() throws Exception {
+
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ destination = createDestination(session, DestinationType.QUEUE_TYPE);
+
+ sendMessages(connection, destination, 2);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+ assertNotNull(consumer.receive(1000));
+ assertNotNull(consumer.receive(1000));
+
+ // install another consumer while message dispatch is unacked/uncommitted
+ Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer redispatchConsumer = redispatchSession.createConsumer(destination);
+
+ session.rollback();
+ session.close();
+
+ Message msg = redispatchConsumer.receive(1000);
+ assertNotNull(msg);
+ assertTrue(msg.getJMSRedelivered());
+ assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
+ msg = redispatchConsumer.receive(1000);
+ assertNotNull(msg);
+ assertTrue(msg.getJMSRedelivered());
+ assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
+ redispatchSession.commit();
+
+ assertNull(redispatchConsumer.receive(500));
+ redispatchSession.close();
+ }
+
+
+ public void initCombosForTestAckOfExpired() {
+ setCombinationValues("protocol", new Object[] {new OpenwireBrokerProtocol()});
+ addCombinationValues("destinationType",
+ new Object[] {DestinationType.QUEUE_TYPE, DestinationType.TOPIC_TYPE});
+ }
+
+ //
+ // TODO: support setting the prefetch policy on a stompjms connection.
+ //
+ // TODO: figure out why this is failing
+ public void ignoreAckOfExpired() throws Exception {
+
+// ActiveMQConnectionFactory fact = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=4&jms.sendAcksAsync=false");
+// connection = fact.createActiveMQConnection();
+
+ ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(4);
+ ((ActiveMQConnection)connection).setSendAcksAsync(false);
+
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+ // connection.setStatsEnabled(true);
+
+ Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = sendSession.createProducer(destination);
+ producer.setTimeToLive(1000);
+ final int count = 4;
+ for (int i = 0; i < count; i++) {
+ TextMessage message = sendSession.createTextMessage("" + i);
+ producer.send(message);
+ }
+
+ // let first bunch in queue expire
+ Thread.sleep(2000);
+
+ producer.setTimeToLive(0);
+ for (int i = 0; i < count; i++) {
+ TextMessage message = sendSession.createTextMessage("no expiry" + i);
+ producer.send(message);
+ }
+
+ ActiveMQMessageConsumer amqConsumer = (ActiveMQMessageConsumer) consumer;
+
+ for(int i=0; i<count; i++) {
+ TextMessage msg = (TextMessage) amqConsumer.receive();
+ assertNotNull(msg);
+ assertTrue("message has \"no expiry\" text: " + msg.getText(), msg.getText().contains("no expiry"));
+
+ // force an ack when there are expired messages
+ amqConsumer.acknowledge();
+ }
+ assertEquals("consumer has expiredMessages", count, amqConsumer.getConsumerStats().getExpiredMessageCount().getCount());
+
+ long t = protocol.getInflightCount(broker, destination);
+ assertEquals("Wrong inFlightCount: " + t, 0, t);
+ t = protocol.getDequeueCount(broker, destination);
+ assertEquals("Wrong dequeue count: " + t, 8, t);
+ //assertEquals("Wrong dispatch count: " + view.getDispatchCount(), 8, view.getDispatchCount());
+ }
+}
Propchange: activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JMSConsumerTest.java
------------------------------------------------------------------------------
svn:executable = *
Modified: activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JmsTestBase.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JmsTestBase.java?rev=1365040&r1=1365039&r2=1365040&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JmsTestBase.java (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/JmsTestBase.java Tue Jul 24 13:30:11 2012
@@ -169,13 +169,6 @@ public class JmsTestBase extends Combina
public String messageTextPrefix = "";
- enum DestinationType {
- QUEUE_TYPE,
- TOPIC_TYPE,
- TEMP_QUEUE_TYPE,
- TEMP_TOPIC_TYPE
- }
-
// /////////////////////////////////////////////////////////////////
//
// Test support methods.
Modified: activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/OpenwireBrokerProtocol.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/OpenwireBrokerProtocol.java?rev=1365040&r1=1365039&r2=1365040&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/OpenwireBrokerProtocol.java (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/OpenwireBrokerProtocol.java Tue Jul 24 13:30:11 2012
@@ -17,10 +17,12 @@
package org.apache.activemq.apollo;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
import static java.lang.String.*;
import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
/**
* <p>
@@ -42,4 +44,8 @@ public class OpenwireBrokerProtocol exte
return "OpenWire";
}
+ @Override
+ protected String name(Destination destination) {
+ return ((ActiveMQDestination)destination).getPhysicalName();
+ }
}
Modified: activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/StompBrokerProtocol.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/StompBrokerProtocol.java?rev=1365040&r1=1365039&r2=1365040&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/StompBrokerProtocol.java (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/StompBrokerProtocol.java Tue Jul 24 13:30:11 2012
@@ -17,8 +17,10 @@
package org.apache.activemq.apollo;
import org.fusesource.stomp.jms.StompJmsConnectionFactory;
+import org.fusesource.stomp.jms.StompJmsDestination;
import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
import static java.lang.String.format;
@@ -41,4 +43,9 @@ public class StompBrokerProtocol extends
public String toString() {
return "STOMP";
}
+
+ @Override
+ protected String name(Destination destination) {
+ return ((StompJmsDestination)destination).getPhysicalName();
+ }
}