You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/07/23 21:49:24 UTC
svn commit: r1364773 - in /activemq/activemq-apollo/trunk/apollo-itests: ./
src/test/java/org/apache/activemq/apollo/ src/test/resources/
Author: chirino
Date: Mon Jul 23 19:49:23 2012
New Revision: 1364773
URL: http://svn.apache.org/viewvc?rev=1364773&view=rev
Log:
Ported over the ProducerFlowControlTest classes from ActiveMQ 5
Added:
activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/ProducerFlowControlSendFailTest.java
activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/ProducerFlowControlTest.java
Modified:
activemq/activemq-apollo/trunk/apollo-itests/pom.xml
activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo.xml
Modified: activemq/activemq-apollo/trunk/apollo-itests/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/pom.xml?rev=1364773&r1=1364772&r2=1364773&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/pom.xml Mon Jul 23 19:49:23 2012
@@ -56,6 +56,19 @@
</dependency>
<!-- So we can boot up a broker -->
+
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ <version>${jackson-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <version>${jackson-version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>apollo-stomp</artifactId>
Added: activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/ProducerFlowControlSendFailTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/ProducerFlowControlSendFailTest.java?rev=1364773&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/ProducerFlowControlSendFailTest.java (added)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/ProducerFlowControlSendFailTest.java Mon Jul 23 19:49:23 2012
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo;
+
+import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
+
+import javax.jms.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ProducerFlowControlSendFailTest extends ProducerFlowControlTest {
+
+ public static Test suite() {
+ return suite(ProducerFlowControlSendFailTest.class);
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
+
+// protected BrokerService createBroker() throws Exception {
+// BrokerService service = new BrokerService();
+// service.setPersistent(false);
+// service.setUseJmx(false);
+//
+// // Setup a destination policy where it takes only 1 message at a time.
+// PolicyMap policyMap = new PolicyMap();
+// PolicyEntry policy = new PolicyEntry();
+// policy.setMemoryLimit(1);
+// policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy());
+// policy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
+// policy.setProducerFlowControl(true);
+// policyMap.setDefaultEntry(policy);
+// service.setDestinationPolicy(policyMap);
+//
+// service.getSystemUsage().setSendFailIfNoSpace(true);
+//
+// connector = service.addConnector("tcp://localhost:0");
+// return service;
+// }
+
+ @Override
+ public void test2ndPubisherWithStandardConnectionThatIsBlocked() throws Exception {
+ // with sendFailIfNoSpace set, there is no blocking of the connection
+ }
+
+ @Override // re-enable once https://issues.apache.org/jira/browse/APLO-225 is fixed
+ public void ignoreAsyncPubisherRecoverAfterBlock() throws Exception {
+ // sendFail means no flowControllwindow as there is no producer ack, just an exception
+ }
+
+ @Override // re-enable once https://issues.apache.org/jira/browse/APLO-225 is fixed
+ public void ignorePubisherRecoverAfterBlock() throws Exception {
+ ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+ // with sendFail, there must be no flowControllwindow
+ // sendFail is an alternative flow control mechanism that does not block
+ factory.setUseAsyncSend(true);
+ connection = (ActiveMQConnection)factory.createConnection();
+ connections.add(connection);
+ connection.start();
+
+ final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ final MessageProducer producer = session.createProducer(queueA);
+
+ final AtomicBoolean keepGoing = new AtomicBoolean(true);
+
+ Thread thread = new Thread("Filler") {
+ @Override
+ public void run() {
+ while (keepGoing.get()) {
+ try {
+ producer.send(session.createTextMessage("Test message"));
+ if (gotResourceException.get()) {
+ // do not flood the broker with requests when full as we are sending async and they
+ // will be limited by the network buffers
+ Thread.sleep(200);
+ }
+ } catch (Exception e) {
+ // with async send, there will be no exceptions
+ e.printStackTrace();
+ }
+ }
+ }
+ };
+ thread.start();
+ waitForBlockedOrResourceLimit(new AtomicBoolean(false));
+
+ // resourceException on second message, resumption if we
+ // can receive 10
+ MessageConsumer consumer = session.createConsumer(queueA);
+ TextMessage msg;
+ for (int idx = 0; idx < 10; ++idx) {
+ msg = (TextMessage) consumer.receive(1000);
+ if (msg != null) {
+ msg.acknowledge();
+ }
+ }
+ keepGoing.set(false);
+ }
+
+ // re-enable once https://issues.apache.org/jira/browse/APLO-225 is fixed
+ public void ignorePubisherRecoverAfterBlockWithSyncSend() throws Exception {
+ ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+ factory.setExceptionListener(null);
+ factory.setUseAsyncSend(false);
+ connection = (ActiveMQConnection)factory.createConnection();
+ connections.add(connection);
+ connection.start();
+
+ final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ final MessageProducer producer = session.createProducer(queueA);
+
+ final AtomicBoolean keepGoing = new AtomicBoolean(true);
+ final AtomicInteger exceptionCount = new AtomicInteger(0);
+ Thread thread = new Thread("Filler") {
+ @Override
+ public void run() {
+ while (keepGoing.get()) {
+ try {
+ producer.send(session.createTextMessage("Test message"));
+ } catch (JMSException arg0) {
+ if (arg0 instanceof ResourceAllocationException) {
+ gotResourceException.set(true);
+ exceptionCount.incrementAndGet();
+ }
+ }
+ }
+ }
+ };
+ thread.start();
+ waitForBlockedOrResourceLimit(new AtomicBoolean(false));
+
+ // resourceException on second message, resumption if we
+ // can receive 10
+ MessageConsumer consumer = session.createConsumer(queueA);
+ TextMessage msg;
+ for (int idx = 0; idx < 10; ++idx) {
+ msg = (TextMessage) consumer.receive(1000);
+ if (msg != null) {
+ msg.acknowledge();
+ }
+ }
+ assertTrue("we were blocked at least 5 times", 5 < exceptionCount.get());
+ keepGoing.set(false);
+ }
+
+ @Override
+ protected ConnectionFactory createConnectionFactory() throws Exception {
+ ActiveMQConnectionFactory connectionFactory = (ActiveMQConnectionFactory) super.createConnectionFactory();
+ connectionFactory.setExceptionListener(new ExceptionListener() {
+ public void onException(JMSException arg0) {
+ if (arg0 instanceof ResourceAllocationException) {
+ gotResourceException.set(true);
+ }
+ }
+ });
+ return connectionFactory;
+ }
+}
Added: activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/ProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/ProducerFlowControlTest.java?rev=1364773&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/ProducerFlowControlTest.java (added)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/ProducerFlowControlTest.java Mon Jul 23 19:49:23 2012
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo;
+
+import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ProducerFlowControlTest extends JmsTestBase {
+
+ public static Test suite() {
+ return suite(ProducerFlowControlTest.class);
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
+
+ // protected BrokerService createBroker() throws Exception {
+ // BrokerService service = new BrokerService();
+ // service.setPersistent(false);
+ // service.setUseJmx(false);
+ //
+ // // Setup a destination policy where it takes only 1 message at a time.
+ // PolicyMap policyMap = new PolicyMap();
+ // PolicyEntry policy = new PolicyEntry();
+ // policy.setMemoryLimit(1);
+ // policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy());
+ // policy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
+ // policy.setProducerFlowControl(true);
+ // policyMap.setDefaultEntry(policy);
+ // service.setDestinationPolicy(policyMap);
+ //
+ // connector = service.addConnector("tcp://localhost:0");
+ // return service;
+ // }
+
+ static final Logger LOG = LoggerFactory.getLogger(ProducerFlowControlTest.class);
+ ActiveMQQueue queueA = new ActiveMQQueue("quota-1.QUEUE.A");
+ ActiveMQQueue queueB = new ActiveMQQueue("quota-1.QUEUE.B");
+ protected ActiveMQConnection connection;
+ // used to test sendFailIfNoSpace on SystemUsage
+ protected final AtomicBoolean gotResourceException = new AtomicBoolean(false);
+
+ // This test is only valid against the Openwire protocol.
+ public void initCombos() {
+ setCombinationValues("protocol", new Object[]{new OpenwireBrokerProtocol()});
+ }
+
+ public void test2ndPubisherWithProducerWindowSendConnectionThatIsBlocked() throws Exception {
+ ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+ factory.setProducerWindowSize(1024 * 64);
+ connection = (ActiveMQConnection)factory.createConnection();
+ connections.add(connection);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queueB);
+
+ // Test sending to Queue A
+ // 1 few sends should not block until the producer window is used up.
+ fillQueue(queueA);
+
+ // Test sending to Queue B it should not block since the connection
+ // should not be blocked.
+ CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
+ assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
+
+ TextMessage msg = (TextMessage)consumer.receive();
+ assertEquals("Message 1", msg.getText());
+ msg.acknowledge();
+
+ pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2");
+ assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
+
+ msg = (TextMessage)consumer.receive();
+ assertEquals("Message 2", msg.getText());
+ msg.acknowledge();
+ }
+
+ // re-enable once https://issues.apache.org/jira/browse/APLO-225 is fixed
+ public void ignorePubisherRecoverAfterBlock() throws Exception {
+ ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+ connection = (ActiveMQConnection)factory.createConnection();
+ connections.add(connection);
+ connection.start();
+
+ final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ final MessageProducer producer = session.createProducer(queueA);
+
+ final AtomicBoolean done = new AtomicBoolean(true);
+ final AtomicBoolean keepGoing = new AtomicBoolean(true);
+
+
+ Thread thread = new Thread("Filler") {
+ int i;
+ @Override
+ public void run() {
+ while (keepGoing.get()) {
+ done.set(false);
+ try {
+ producer.send(session.createTextMessage("Test message " + ++i));
+ LOG.info("sent: " + i);
+ } catch (JMSException e) {
+ }
+ }
+ }
+ };
+ thread.start();
+ waitForBlockedOrResourceLimit(done);
+
+ // after receiveing messges, producer should continue sending messages
+ // (done == false)
+ MessageConsumer consumer = session.createConsumer(queueA);
+ TextMessage msg;
+ for (int idx = 0; idx < 5; ++idx) {
+ msg = (TextMessage) consumer.receive(1000);
+ LOG.info("received: " + idx + ", msg: " + msg.getJMSMessageID());
+ msg.acknowledge();
+ }
+ Thread.sleep(1000);
+ keepGoing.set(false);
+
+ assertFalse("producer has resumed", done.get());
+ }
+
+ // re-enable once https://issues.apache.org/jira/browse/APLO-225 is fixed
+ public void ignoreAsyncPubisherRecoverAfterBlock() throws Exception {
+ ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+ factory.setProducerWindowSize(1024 * 5);
+ factory.setUseAsyncSend(true);
+ connection = (ActiveMQConnection)factory.createConnection();
+ connections.add(connection);
+ connection.start();
+
+ final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ final MessageProducer producer = session.createProducer(queueA);
+
+ final AtomicBoolean done = new AtomicBoolean(true);
+ final AtomicBoolean keepGoing = new AtomicBoolean(true);
+
+
+ Thread thread = new Thread("Filler") {
+ int i;
+ @Override
+ public void run() {
+ while (keepGoing.get()) {
+ done.set(false);
+ try {
+ producer.send(session.createTextMessage("Test message " + ++i));
+ LOG.info("sent: " + i);
+ } catch (JMSException e) {
+ }
+ }
+ }
+ };
+ thread.start();
+ waitForBlockedOrResourceLimit(done);
+
+ // after receiveing messges, producer should continue sending messages
+ // (done == false)
+ MessageConsumer consumer = session.createConsumer(queueA);
+ TextMessage msg;
+ for (int idx = 0; idx < 5; ++idx) {
+ msg = (TextMessage) consumer.receive(1000);
+ assertNotNull("Got a message", msg);
+ LOG.info("received: " + idx + ", msg: " + msg.getJMSMessageID());
+ msg.acknowledge();
+ }
+ Thread.sleep(1000);
+ keepGoing.set(false);
+
+ assertFalse("producer has resumed", done.get());
+ }
+
+ // re-enable once https://issues.apache.org/jira/browse/APLO-225 is fixed
+ public void ignore2ndPubisherWithSyncSendConnectionThatIsBlocked() throws Exception {
+ ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+ factory.setAlwaysSyncSend(true);
+ connection = (ActiveMQConnection)factory.createConnection();
+ connections.add(connection);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queueB);
+
+ // Test sending to Queue A
+ // 1st send should not block. But the rest will.
+ fillQueue(queueA);
+
+ // Test sending to Queue B it should not block.
+ CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
+ assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
+
+ TextMessage msg = (TextMessage)consumer.receive();
+ assertEquals("Message 1", msg.getText());
+ msg.acknowledge();
+
+ pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2");
+ assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
+
+ msg = (TextMessage)consumer.receive();
+ assertEquals("Message 2", msg.getText());
+ msg.acknowledge();
+ }
+
+ public void testSimpleSendReceive() throws Exception {
+ ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+ factory.setAlwaysSyncSend(true);
+ connection = (ActiveMQConnection)factory.createConnection();
+ connections.add(connection);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queueA);
+
+ // Test sending to Queue B it should not block.
+ CountDownLatch pubishDoneToQeueuA = asyncSendTo(queueA, "Message 1");
+ assertTrue(pubishDoneToQeueuA.await(2, TimeUnit.SECONDS));
+
+ TextMessage msg = (TextMessage)consumer.receive();
+ assertEquals("Message 1", msg.getText());
+ msg.acknowledge();
+
+ pubishDoneToQeueuA = asyncSendTo(queueA, "Message 2");
+ assertTrue(pubishDoneToQeueuA.await(2, TimeUnit.SECONDS));
+
+ msg = (TextMessage)consumer.receive();
+ assertEquals("Message 2", msg.getText());
+ msg.acknowledge();
+ }
+
+ public void test2ndPubisherWithStandardConnectionThatIsBlocked() throws Exception {
+ ConnectionFactory factory = createConnectionFactory();
+ connection = (ActiveMQConnection)factory.createConnection();
+ connections.add(connection);
+ connection.start();
+
+ // Test sending to Queue A
+ // 1st send should not block.
+ fillQueue(queueA);
+
+ // Test sending to Queue B it should block.
+ // Since even though the it's queue limits have not been reached, the
+ // connection
+ // is blocked.
+ CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
+ assertFalse(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
+ }
+
+ private void fillQueue(final ActiveMQQueue queue) throws JMSException, InterruptedException {
+ final AtomicBoolean done = new AtomicBoolean(true);
+ final AtomicBoolean keepGoing = new AtomicBoolean(true);
+
+ // Starts an async thread that every time it publishes it sets the done
+ // flag to false.
+ // Once the send starts to block it will not reset the done flag
+ // anymore.
+ new Thread("Fill thread.") {
+ public void run() {
+ Session session = null;
+ try {
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ while (keepGoing.get()) {
+ done.set(false);
+ producer.send(session.createTextMessage("Hello World"));
+ }
+ } catch (JMSException e) {
+ } finally {
+ safeClose(session);
+ }
+ }
+ }.start();
+
+ waitForBlockedOrResourceLimit(done);
+ keepGoing.set(false);
+ }
+
+ protected void waitForBlockedOrResourceLimit(final AtomicBoolean done)
+ throws InterruptedException {
+ while (true) {
+ Thread.sleep(1000);
+ // the producer is blocked once the done flag stays true or there is a resource exception
+ if (done.get() || gotResourceException.get()) {
+ break;
+ }
+ done.set(true);
+ }
+ }
+
+ private CountDownLatch asyncSendTo(final ActiveMQQueue queue, final String message) throws JMSException {
+ final CountDownLatch done = new CountDownLatch(1);
+ new Thread("Send thread.") {
+ public void run() {
+ Session session = null;
+ try {
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ producer.send(session.createTextMessage(message));
+ done.countDown();
+ } catch (JMSException e) {
+ } finally {
+ safeClose(session);
+ }
+ }
+ }.start();
+ return done;
+ }
+
+ public void setUp() throws Exception {
+ setAutoFail(true);
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception {
+ if (connection != null) {
+ TcpTransport t = (TcpTransport)connection.getTransport().narrow(TcpTransport.class);
+ t.getTransportListener().onException(new IOException("Disposed."));
+ connection.getTransport().stop();
+ }
+ super.tearDown();
+ }
+
+ protected ConnectionFactory createConnectionFactory() throws Exception {
+ return protocol.getConnectionFactory(broker);
+ }
+}
Modified: activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo.xml?rev=1364773&r1=1364772&r2=1364773&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo.xml Mon Jul 23 19:49:23 2012
@@ -21,8 +21,15 @@
<virtual_host id="default" purge_on_startup="true" auto_create_queues="true">
<host_name>localhost</host_name>
<queue id="mirrored.**" mirrored="true"/>
+
+ <queue id="quota-1.**" quota="1"/>
+ <topic id="quota-1.**" slow_consumer_policy="queue">
+ <subscription quota="1"/>
+ </topic>
+
</virtual_host>
+ <!--<web_admin bind="http://0.0.0.0:61680"/>-->
<connector id="tcp" bind="tcp://0.0.0.0:0"/>
</broker>
\ No newline at end of file