You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/12/26 17:14:15 UTC
svn commit: r490319 - in
/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases:
AMQFailoverIssue.java AMQStackOverFlowTest.java
Author: rajdavies
Date: Tue Dec 26 08:14:14 2006
New Revision: 490319
URL: http://svn.apache.org/viewvc?view=rev&rev=490319
Log:
uses cases submitted by Brian Diesenhaus
Added:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java (with props)
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStackOverFlowTest.java (with props)
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java?view=auto&rev=490319
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java Tue Dec 26 08:14:14 2006
@@ -0,0 +1,218 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.usecases;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import junit.framework.Assert;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+
+public class AMQFailoverIssue extends TestCase{
+
+ private static final String URL1="tcp://localhost:61616";
+ private static final String QUEUE1_NAME="test.queue.1";
+ private static final int MAX_CONSUMERS=10;
+ private static final int MAX_PRODUCERS=5;
+ private static final int NUM_MESSAGE_TO_SEND=10000;
+ private static final int TOTAL_MESSAGES=MAX_PRODUCERS * NUM_MESSAGE_TO_SEND;
+ private static final boolean USE_FAILOVER=true;
+ private AtomicInteger messageCount=new AtomicInteger();
+ private CountDownLatch doneLatch;
+
+ @Override public void setUp() throws Exception{
+ }
+
+ @Override public void tearDown() throws Exception{
+ }
+
+ // This should fail with incubator-activemq-fuse-4.1.0.5
+ public void testFailoverIssue() throws Exception{
+ BrokerService brokerService1=null;
+ ActiveMQConnectionFactory acf=null;
+ PooledConnectionFactory pcf=null;
+ DefaultMessageListenerContainer container1=null;
+ try{
+ brokerService1=createBrokerService("broker1",URL1,null);
+ brokerService1.start();
+ acf=createConnectionFactory(URL1,USE_FAILOVER);
+ pcf=new PooledConnectionFactory(acf);
+ // Only listen on the first queue.. let the 2nd queue fill up.
+ doneLatch=new CountDownLatch(TOTAL_MESSAGES);
+ container1=createDefaultMessageListenerContainer(acf,new TestMessageListener1(0),QUEUE1_NAME);
+ container1.afterPropertiesSet();
+ Thread.sleep(5000);
+ final ExecutorService executor=Executors.newCachedThreadPool();
+ for(int i=0;i<MAX_PRODUCERS;i++){
+ executor.submit(new PooledProducerTask(pcf,QUEUE1_NAME));
+ }
+ // Wait for all message to arrive.
+ assertTrue(doneLatch.await(45,TimeUnit.SECONDS));
+ executor.shutdown();
+ // Thread.sleep(30000);
+ Assert.assertEquals(TOTAL_MESSAGES,messageCount.get());
+ }finally{
+ container1.stop();
+ container1.destroy();
+ container1=null;
+ brokerService1.stop();
+ brokerService1=null;
+ }
+ }
+
+ private BrokerService createBrokerService(final String brokerName,final String uri1,final String uri2)
+ throws Exception{
+ final BrokerService brokerService=new BrokerService();
+ brokerService.setBrokerName(brokerName);
+ brokerService.setPersistent(false);
+ brokerService.setUseJmx(true);
+ final UsageManager memoryManager=new UsageManager();
+ memoryManager.setLimit(5000000);
+ brokerService.setMemoryManager(memoryManager);
+ final ArrayList<PolicyEntry> policyEntries=new ArrayList<PolicyEntry>();
+ final PolicyEntry entry=new PolicyEntry();
+ entry.setQueue(">");
+ // entry.setQueue(QUEUE1_NAME);
+ entry.setMemoryLimit(1);
+ policyEntries.add(entry);
+ final PolicyMap policyMap=new PolicyMap();
+ policyMap.setPolicyEntries(policyEntries);
+ brokerService.setDestinationPolicy(policyMap);
+ final TransportConnector tConnector=new TransportConnector();
+ tConnector.setUri(new URI(uri1));
+ tConnector.setBrokerName(brokerName);
+ tConnector.setName(brokerName+".transportConnector");
+ brokerService.addConnector(tConnector);
+ if(uri2!=null){
+ final NetworkConnector nc=new DiscoveryNetworkConnector(new URI("static:"+uri2));
+ nc.setBridgeTempDestinations(true);
+ nc.setBrokerName(brokerName);
+ nc.setName(brokerName+".nc");
+ nc.setPrefetchSize(1);
+ brokerService.addNetworkConnector(nc);
+ }
+ return brokerService;
+ }
+
+ public DefaultMessageListenerContainer createDefaultMessageListenerContainer(final ConnectionFactory acf,
+ final MessageListener listener,final String queue){
+ final DefaultMessageListenerContainer container=new DefaultMessageListenerContainer();
+ container.setConnectionFactory(acf);
+ container.setDestinationName(queue);
+ container.setMessageListener(listener);
+ container.setSessionTransacted(false);
+ container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
+ container.setConcurrentConsumers(MAX_CONSUMERS);
+ return container;
+ }
+
+ public ActiveMQConnectionFactory createConnectionFactory(final String url,final boolean useFailover){
+ final String failoverUrl="failover:("+url+")";
+ final ActiveMQConnectionFactory acf=new ActiveMQConnectionFactory(useFailover?failoverUrl:url);
+ acf.setCopyMessageOnSend(false);
+ acf.setUseAsyncSend(false);
+ acf.setDispatchAsync(true);
+ acf.setUseCompression(false);
+ acf.setOptimizeAcknowledge(false);
+ acf.setOptimizedMessageDispatch(true);
+ acf.setUseAsyncSend(false);
+ return acf;
+ }
+
+ private class TestMessageListener1 implements MessageListener{
+
+ private final long waitTime;
+
+ public TestMessageListener1(long waitTime){
+ this.waitTime=waitTime;
+ }
+
+ public void onMessage(Message msg){
+ try{
+ messageCount.incrementAndGet();
+ doneLatch.countDown();
+ Thread.sleep(waitTime);
+ }catch(InterruptedException e){
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private class PooledProducerTask implements Runnable{
+
+ private final String queueName;
+ private final PooledConnectionFactory pcf;
+
+ public PooledProducerTask(final PooledConnectionFactory pcf,final String queueName){
+ this.pcf=pcf;
+ this.queueName=queueName;
+ }
+
+ public void run(){
+ try{
+ final JmsTemplate jmsTemplate=new JmsTemplate(pcf);
+ jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ jmsTemplate.setExplicitQosEnabled(true);
+ jmsTemplate.setMessageIdEnabled(false);
+ jmsTemplate.setMessageTimestampEnabled(false);
+ jmsTemplate.afterPropertiesSet();
+ final byte[] bytes=new byte[2048];
+ final Random r=new Random();
+ r.nextBytes(bytes);
+ Thread.sleep(2000);
+ final AtomicInteger count=new AtomicInteger();
+ for(int i=0;i<NUM_MESSAGE_TO_SEND;i++){
+ jmsTemplate.send(queueName,new MessageCreator(){
+
+ public Message createMessage(Session session) throws JMSException{
+ final BytesMessage message=session.createBytesMessage();
+ message.writeBytes(bytes);
+ message.setIntProperty("count",count.incrementAndGet());
+ message.setStringProperty("producer","pooled");
+ return message;
+ }
+ });
+ }
+ }catch(final Throwable e){
+ e.printStackTrace();
+ }
+ }
+ }
+}
Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStackOverFlowTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStackOverFlowTest.java?view=auto&rev=490319
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStackOverFlowTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStackOverFlowTest.java Tue Dec 26 08:14:14 2006
@@ -0,0 +1,156 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.usecases;
+
+import java.net.URI;
+import java.util.ArrayList;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import junit.framework.Assert;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+
+public class AMQStackOverFlowTest extends TestCase {
+
+ private static final String URL1 = "tcp://localhost:61616";
+
+ private static final String URL2 = "tcp://localhost:61617";
+
+ public void testStackOverflow() throws Exception {
+ BrokerService brokerService1 = null;
+ BrokerService brokerService2 = null;
+
+ try {
+ brokerService1 = createBrokerService("broker1", URL1, URL2);
+ brokerService1.start();
+ brokerService2 = createBrokerService("broker2", URL2, URL1);
+ brokerService2.start();
+
+ final ActiveMQConnectionFactory cf1 = new ActiveMQConnectionFactory(
+ URL1);
+ cf1.setUseAsyncSend(false);
+
+ final ActiveMQConnectionFactory cf2 = new ActiveMQConnectionFactory(
+ URL2);
+ cf2.setUseAsyncSend(false);
+
+ final JmsTemplate template1 = new JmsTemplate(cf1);
+ template1.setReceiveTimeout(10000);
+
+ template1.send("test.q", new MessageCreator() {
+
+ public Message createMessage(Session session)
+ throws JMSException {
+ return session.createTextMessage("test");
+ }
+
+ });
+
+ final JmsTemplate template2 = new JmsTemplate(cf2);
+ template2.setReceiveTimeout(10000);
+
+ final Message m = template2.receive("test.q");
+ assertTrue(m instanceof TextMessage);
+
+ final TextMessage tm = (TextMessage) m;
+
+ Assert.assertEquals("test", tm.getText());
+
+ template2.send("test2.q", new MessageCreator() {
+
+ public Message createMessage(Session session)
+ throws JMSException {
+ return session.createTextMessage("test2");
+ }
+
+ });
+
+ final Message m2 = template1.receive("test2.q");
+
+ assertTrue(m2 instanceof TextMessage);
+
+ final TextMessage tm2 = (TextMessage) m2;
+
+ Assert.assertEquals("test2", tm2.getText());
+
+ } finally {
+
+ brokerService1.stop();
+ brokerService1 = null;
+ brokerService2.stop();
+ brokerService2 = null;
+
+ }
+
+ }
+
+ private BrokerService createBrokerService(final String brokerName,
+ final String uri1, final String uri2) throws Exception {
+ final BrokerService brokerService = new BrokerService();
+
+ brokerService.setBrokerName(brokerName);
+ brokerService.setPersistent(false);
+ brokerService.setUseJmx(true);
+
+ final UsageManager memoryManager = new UsageManager();
+ memoryManager.setLimit(10);
+ brokerService.setMemoryManager(memoryManager);
+
+ final ArrayList<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
+
+ final PolicyEntry entry = new PolicyEntry();
+ entry.setQueue(">");
+ entry.setMemoryLimit(1);
+ policyEntries.add(entry);
+
+ final PolicyMap policyMap = new PolicyMap();
+ policyMap.setPolicyEntries(policyEntries);
+ brokerService.setDestinationPolicy(policyMap);
+
+ final TransportConnector tConnector = new TransportConnector();
+ tConnector.setUri(new URI(uri1));
+ tConnector.setBrokerName(brokerName);
+ tConnector.setName(brokerName + ".transportConnector");
+ brokerService.addConnector(tConnector);
+
+ if (uri2 != null) {
+ final NetworkConnector nc = new DiscoveryNetworkConnector(new URI(
+ "static:" + uri2));
+ nc.setBridgeTempDestinations(true);
+ nc.setBrokerName(brokerName);
+ nc.setName(brokerName + ".nc");
+ nc.setPrefetchSize(1);
+ brokerService.addNetworkConnector(nc);
+ }
+
+ return brokerService;
+
+ }
+}
Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStackOverFlowTest.java
------------------------------------------------------------------------------
svn:eol-style = native