You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/07/23 21:49:24 UTC

svn commit: r1364773 - in /activemq/activemq-apollo/trunk/apollo-itests: ./ src/test/java/org/apache/activemq/apollo/ src/test/resources/

Author: chirino
Date: Mon Jul 23 19:49:23 2012
New Revision: 1364773

URL: http://svn.apache.org/viewvc?rev=1364773&view=rev
Log:
Ported over the ProducerFlowControlTest classes from ActiveMQ 5

Added:
    activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/ProducerFlowControlSendFailTest.java
    activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/ProducerFlowControlTest.java
Modified:
    activemq/activemq-apollo/trunk/apollo-itests/pom.xml
    activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo.xml

Modified: activemq/activemq-apollo/trunk/apollo-itests/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/pom.xml?rev=1364773&r1=1364772&r2=1364773&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/pom.xml Mon Jul 23 19:49:23 2012
@@ -56,6 +56,19 @@
     </dependency>
 
     <!-- So we can boot up a broker -->
+
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-core-asl</artifactId>
+      <version>${jackson-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+      <version>${jackson-version}</version>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.activemq</groupId>
       <artifactId>apollo-stomp</artifactId>

Added: activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/ProducerFlowControlSendFailTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/ProducerFlowControlSendFailTest.java?rev=1364773&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/ProducerFlowControlSendFailTest.java (added)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/ProducerFlowControlSendFailTest.java Mon Jul 23 19:49:23 2012
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo;
+
+import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
+
+import javax.jms.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ProducerFlowControlSendFailTest extends ProducerFlowControlTest {
+
+    public static Test suite() {
+        return suite(ProducerFlowControlSendFailTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+//    protected BrokerService createBroker() throws Exception {
+//        BrokerService service = new BrokerService();
+//        service.setPersistent(false);
+//        service.setUseJmx(false);
+//
+//        // Setup a destination policy where it takes only 1 message at a time.
+//        PolicyMap policyMap = new PolicyMap();
+//        PolicyEntry policy = new PolicyEntry();
+//        policy.setMemoryLimit(1);
+//        policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy());
+//        policy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
+//        policy.setProducerFlowControl(true);
+//        policyMap.setDefaultEntry(policy);
+//        service.setDestinationPolicy(policyMap);
+//
+//        service.getSystemUsage().setSendFailIfNoSpace(true);
+//
+//        connector = service.addConnector("tcp://localhost:0");
+//        return service;
+//    }
+
+    @Override
+    public void test2ndPubisherWithStandardConnectionThatIsBlocked() throws Exception {
+        // with sendFailIfNoSpace set, there is no blocking of the connection
+    }
+    
+    @Override // re-enable once https://issues.apache.org/jira/browse/APLO-225 is fixed
+    public void ignoreAsyncPubisherRecoverAfterBlock() throws Exception {
+        // sendFail means no flowControllwindow as there is no producer ack, just an exception
+    }
+    
+    @Override // re-enable once https://issues.apache.org/jira/browse/APLO-225 is fixed
+    public void ignorePubisherRecoverAfterBlock() throws Exception {
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+        // with sendFail, there must be no flowControllwindow
+        // sendFail is an alternative flow control mechanism that does not block
+        factory.setUseAsyncSend(true);
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final MessageProducer producer = session.createProducer(queueA);
+        
+        final AtomicBoolean keepGoing = new AtomicBoolean(true);
+   
+        Thread thread = new Thread("Filler") {
+            @Override
+            public void run() {
+                while (keepGoing.get()) {
+                    try {
+                        producer.send(session.createTextMessage("Test message"));
+                        if (gotResourceException.get()) {
+                            // do not flood the broker with requests when full as we are sending async and they 
+                            // will be limited by the network buffers
+                            Thread.sleep(200);
+                        }
+                    } catch (Exception e) {
+                        // with async send, there will be no exceptions
+                        e.printStackTrace();
+                    }
+                }
+            }
+        };
+        thread.start();
+        waitForBlockedOrResourceLimit(new AtomicBoolean(false));
+
+        // resourceException on second message, resumption if we
+        // can receive 10
+        MessageConsumer consumer = session.createConsumer(queueA);
+        TextMessage msg;
+        for (int idx = 0; idx < 10; ++idx) {
+            msg = (TextMessage) consumer.receive(1000);
+            if (msg != null) {
+                msg.acknowledge();
+            }
+        }
+        keepGoing.set(false);
+    }
+
+    // re-enable once https://issues.apache.org/jira/browse/APLO-225 is fixed
+    public void ignorePubisherRecoverAfterBlockWithSyncSend() throws Exception {
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+        factory.setExceptionListener(null);
+        factory.setUseAsyncSend(false);
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final MessageProducer producer = session.createProducer(queueA);
+        
+        final AtomicBoolean keepGoing = new AtomicBoolean(true);
+        final AtomicInteger exceptionCount = new AtomicInteger(0);
+        Thread thread = new Thread("Filler") {
+            @Override
+            public void run() {
+                while (keepGoing.get()) {
+                    try {
+                        producer.send(session.createTextMessage("Test message"));
+                    } catch (JMSException arg0) {
+                        if (arg0 instanceof ResourceAllocationException) {
+                            gotResourceException.set(true);
+                            exceptionCount.incrementAndGet();
+                        }
+                    }
+                }
+            }
+        };
+        thread.start();
+        waitForBlockedOrResourceLimit(new AtomicBoolean(false));
+
+        // resourceException on second message, resumption if we
+        // can receive 10
+        MessageConsumer consumer = session.createConsumer(queueA);
+        TextMessage msg;
+        for (int idx = 0; idx < 10; ++idx) {
+            msg = (TextMessage) consumer.receive(1000);
+            if (msg != null) {
+                msg.acknowledge();
+            }
+        }
+        assertTrue("we were blocked at least 5 times", 5 < exceptionCount.get());
+        keepGoing.set(false);
+    }
+    
+	@Override
+	protected ConnectionFactory createConnectionFactory() throws Exception {
+		ActiveMQConnectionFactory connectionFactory = (ActiveMQConnectionFactory) super.createConnectionFactory();
+		connectionFactory.setExceptionListener(new ExceptionListener() {
+				public void onException(JMSException arg0) {
+					if (arg0 instanceof ResourceAllocationException) {
+						gotResourceException.set(true);
+					}
+				}
+	        });
+		return connectionFactory;
+	}
+}

Added: activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/ProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/ProducerFlowControlTest.java?rev=1364773&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/ProducerFlowControlTest.java (added)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/java/org/apache/activemq/apollo/ProducerFlowControlTest.java Mon Jul 23 19:49:23 2012
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo;
+
+import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ProducerFlowControlTest extends JmsTestBase {
+
+    public static Test suite() {
+        return suite(ProducerFlowControlTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    //    protected BrokerService createBroker() throws Exception {
+    //        BrokerService service = new BrokerService();
+    //        service.setPersistent(false);
+    //        service.setUseJmx(false);
+    //
+    //        // Setup a destination policy where it takes only 1 message at a time.
+    //        PolicyMap policyMap = new PolicyMap();
+    //        PolicyEntry policy = new PolicyEntry();
+    //        policy.setMemoryLimit(1);
+    //        policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy());
+    //        policy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
+    //        policy.setProducerFlowControl(true);
+    //        policyMap.setDefaultEntry(policy);
+    //        service.setDestinationPolicy(policyMap);
+    //
+    //        connector = service.addConnector("tcp://localhost:0");
+    //        return service;
+    //    }
+
+    static final Logger LOG = LoggerFactory.getLogger(ProducerFlowControlTest.class);
+    ActiveMQQueue queueA = new ActiveMQQueue("quota-1.QUEUE.A");
+    ActiveMQQueue queueB = new ActiveMQQueue("quota-1.QUEUE.B");
+    protected ActiveMQConnection connection;
+    // used to test sendFailIfNoSpace on SystemUsage 
+    protected final AtomicBoolean gotResourceException = new AtomicBoolean(false);
+
+    // This test is only valid against the Openwire protocol.
+    public void initCombos() {
+        setCombinationValues("protocol", new Object[]{new OpenwireBrokerProtocol()});
+    }
+
+    public void test2ndPubisherWithProducerWindowSendConnectionThatIsBlocked() throws Exception {
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+        factory.setProducerWindowSize(1024 * 64);
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(queueB);
+
+        // Test sending to Queue A
+        // 1 few sends should not block until the producer window is used up.
+        fillQueue(queueA);
+
+        // Test sending to Queue B it should not block since the connection
+        // should not be blocked.
+        CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
+        assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
+
+        TextMessage msg = (TextMessage)consumer.receive();
+        assertEquals("Message 1", msg.getText());
+        msg.acknowledge();
+
+        pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2");
+        assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
+
+        msg = (TextMessage)consumer.receive();
+        assertEquals("Message 2", msg.getText());
+        msg.acknowledge();
+    }
+
+    // re-enable once https://issues.apache.org/jira/browse/APLO-225 is fixed
+    public void ignorePubisherRecoverAfterBlock() throws Exception {
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final MessageProducer producer = session.createProducer(queueA);
+        
+        final AtomicBoolean done = new AtomicBoolean(true);
+        final AtomicBoolean keepGoing = new AtomicBoolean(true);
+        
+   
+		Thread thread = new Thread("Filler") {
+		    int i;
+			@Override
+			public void run() {
+                while (keepGoing.get()) {
+                    done.set(false);
+                    try {
+						producer.send(session.createTextMessage("Test message " + ++i));
+						LOG.info("sent: " + i);
+					} catch (JMSException e) {
+					}
+                }
+			}
+		};
+		thread.start();
+        waitForBlockedOrResourceLimit(done);
+
+        // after receiveing messges, producer should continue sending messages 
+        // (done == false)
+        MessageConsumer consumer = session.createConsumer(queueA);
+        TextMessage msg;
+        for (int idx = 0; idx < 5; ++idx) {
+        	msg = (TextMessage) consumer.receive(1000);
+        	LOG.info("received: " + idx + ", msg: " + msg.getJMSMessageID());
+        	msg.acknowledge();
+        }
+        Thread.sleep(1000);
+        keepGoing.set(false);
+    	
+		assertFalse("producer has resumed", done.get());
+    }
+
+    // re-enable once https://issues.apache.org/jira/browse/APLO-225 is fixed
+    public void ignoreAsyncPubisherRecoverAfterBlock() throws Exception {
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+        factory.setProducerWindowSize(1024 * 5);
+        factory.setUseAsyncSend(true);
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final MessageProducer producer = session.createProducer(queueA);
+        
+        final AtomicBoolean done = new AtomicBoolean(true);
+        final AtomicBoolean keepGoing = new AtomicBoolean(true);
+        
+   
+        Thread thread = new Thread("Filler") {
+            int i;
+            @Override
+            public void run() {
+                while (keepGoing.get()) {
+                    done.set(false);
+                    try {
+                        producer.send(session.createTextMessage("Test message " + ++i));
+                        LOG.info("sent: " + i);
+                    } catch (JMSException e) {
+                    }
+                }
+            }
+        };
+        thread.start();
+        waitForBlockedOrResourceLimit(done);
+
+        // after receiveing messges, producer should continue sending messages 
+        // (done == false)
+        MessageConsumer consumer = session.createConsumer(queueA);
+        TextMessage msg;
+        for (int idx = 0; idx < 5; ++idx) {
+            msg = (TextMessage) consumer.receive(1000);
+            assertNotNull("Got a message", msg);
+            LOG.info("received: " + idx + ", msg: " + msg.getJMSMessageID());
+            msg.acknowledge();
+        }
+        Thread.sleep(1000);
+        keepGoing.set(false);
+        
+        assertFalse("producer has resumed", done.get());
+    }
+
+    // re-enable once https://issues.apache.org/jira/browse/APLO-225 is fixed
+    public void ignore2ndPubisherWithSyncSendConnectionThatIsBlocked() throws Exception {
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+        factory.setAlwaysSyncSend(true);
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(queueB);
+
+        // Test sending to Queue A
+        // 1st send should not block. But the rest will.
+        fillQueue(queueA);
+
+        // Test sending to Queue B it should not block.
+        CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
+        assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
+
+        TextMessage msg = (TextMessage)consumer.receive();
+        assertEquals("Message 1", msg.getText());
+        msg.acknowledge();
+
+        pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2");
+        assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
+
+        msg = (TextMessage)consumer.receive();
+        assertEquals("Message 2", msg.getText());
+        msg.acknowledge();
+    }
+
+    public void testSimpleSendReceive() throws Exception {
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+        factory.setAlwaysSyncSend(true);
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(queueA);
+
+        // Test sending to Queue B it should not block.
+        CountDownLatch pubishDoneToQeueuA = asyncSendTo(queueA, "Message 1");
+        assertTrue(pubishDoneToQeueuA.await(2, TimeUnit.SECONDS));
+
+        TextMessage msg = (TextMessage)consumer.receive();
+        assertEquals("Message 1", msg.getText());
+        msg.acknowledge();
+
+        pubishDoneToQeueuA = asyncSendTo(queueA, "Message 2");
+        assertTrue(pubishDoneToQeueuA.await(2, TimeUnit.SECONDS));
+
+        msg = (TextMessage)consumer.receive();
+        assertEquals("Message 2", msg.getText());
+        msg.acknowledge();
+    }
+
+    public void test2ndPubisherWithStandardConnectionThatIsBlocked() throws Exception {
+        ConnectionFactory factory = createConnectionFactory();
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        // Test sending to Queue A
+        // 1st send should not block.
+        fillQueue(queueA);
+
+        // Test sending to Queue B it should block.
+        // Since even though the it's queue limits have not been reached, the
+        // connection
+        // is blocked.
+        CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
+        assertFalse(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
+    }
+
+    private void fillQueue(final ActiveMQQueue queue) throws JMSException, InterruptedException {
+        final AtomicBoolean done = new AtomicBoolean(true);
+        final AtomicBoolean keepGoing = new AtomicBoolean(true);
+
+        // Starts an async thread that every time it publishes it sets the done
+        // flag to false.
+        // Once the send starts to block it will not reset the done flag
+        // anymore.
+        new Thread("Fill thread.") {
+            public void run() {
+                Session session = null;
+                try {
+                    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    MessageProducer producer = session.createProducer(queue);
+                    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                    while (keepGoing.get()) {
+                        done.set(false);
+                        producer.send(session.createTextMessage("Hello World"));
+                    }
+                } catch (JMSException e) {
+                } finally {
+                    safeClose(session);
+                }
+            }
+        }.start();
+
+        waitForBlockedOrResourceLimit(done);
+        keepGoing.set(false);
+    }
+
+    protected void waitForBlockedOrResourceLimit(final AtomicBoolean done)
+            throws InterruptedException {
+        while (true) {
+            Thread.sleep(1000);
+            // the producer is blocked once the done flag stays true or there is a resource exception
+            if (done.get() || gotResourceException.get()) {
+                break;
+            }
+            done.set(true);
+        }
+    }
+
+    private CountDownLatch asyncSendTo(final ActiveMQQueue queue, final String message) throws JMSException {
+        final CountDownLatch done = new CountDownLatch(1);
+        new Thread("Send thread.") {
+            public void run() {
+                Session session = null;
+                try {
+                    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    MessageProducer producer = session.createProducer(queue);
+                    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                    producer.send(session.createTextMessage(message));
+                    done.countDown();
+                } catch (JMSException e) {
+                } finally {
+                    safeClose(session);
+                }
+            }
+        }.start();
+        return done;
+    }
+
+    public void setUp() throws Exception {
+        setAutoFail(true);
+        super.setUp();
+    }
+    
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            TcpTransport t = (TcpTransport)connection.getTransport().narrow(TcpTransport.class);
+            t.getTransportListener().onException(new IOException("Disposed."));
+            connection.getTransport().stop();
+        }
+        super.tearDown();
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return protocol.getConnectionFactory(broker);
+    }
+}

Modified: activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo.xml?rev=1364773&r1=1364772&r2=1364773&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/src/test/resources/apollo.xml Mon Jul 23 19:49:23 2012
@@ -21,8 +21,15 @@
   <virtual_host id="default" purge_on_startup="true" auto_create_queues="true">
     <host_name>localhost</host_name>
     <queue id="mirrored.**" mirrored="true"/>
+
+    <queue id="quota-1.**" quota="1"/>
+    <topic id="quota-1.**" slow_consumer_policy="queue">
+      <subscription quota="1"/>
+    </topic>
+
   </virtual_host>
 
+  <!--<web_admin bind="http://0.0.0.0:61680"/>-->
   <connector id="tcp" bind="tcp://0.0.0.0:0"/>
 
 </broker>
\ No newline at end of file