You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2010/04/20 17:05:30 UTC
svn commit: r935952 - in /activemq/trunk: ./
activemq-core/src/main/java/org/apache/activemq/xbean/ activemq-pool/
activemq-pool/src/main/java/org/apache/activemq/pool/
activemq-pool/src/test/java/org/apache/activemq/usecases/ activemq-spring/
activemq...
Author: dejanb
Date: Tue Apr 20 15:05:29 2010
New Revision: 935952
URL: http://svn.apache.org/viewvc?rev=935952&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2702 - introducing activemq-spring module
Added:
activemq/trunk/activemq-spring/ (with props)
activemq/trunk/activemq-spring/pom.xml (with props)
activemq/trunk/activemq-spring/src/
activemq/trunk/activemq-spring/src/main/
activemq/trunk/activemq-spring/src/main/java/
activemq/trunk/activemq-spring/src/main/java/org/
activemq/trunk/activemq-spring/src/main/java/org/apache/
activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/
activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/pool/
activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/pool/PooledConnectionFactoryBean.java
activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/spring/
activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/xbean/
activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/xbean/PooledBrokerFactoryBean.java
activemq/trunk/activemq-spring/src/main/resources/
activemq/trunk/activemq-spring/src/test/
activemq/trunk/activemq-spring/src/test/java/
activemq/trunk/activemq-spring/src/test/java/org/
activemq/trunk/activemq-spring/src/test/java/org/apache/
activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/
activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/
activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQDeadlockTest3.java
activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQDeadlockTestW4Brokers.java
activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java
activemq/trunk/activemq-spring/src/test/resources/
Removed:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/xbean/PooledBrokerFactoryBean.java
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactoryBean.java
activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/
Modified:
activemq/trunk/activemq-pool/pom.xml
activemq/trunk/pom.xml
Modified: activemq/trunk/activemq-pool/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/pom.xml?rev=935952&r1=935951&r2=935952&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/pom.xml (original)
+++ activemq/trunk/activemq-pool/pom.xml Tue Apr 20 15:05:29 2010
@@ -35,7 +35,6 @@
javax.transaction*;resolution:=optional,
org.apache.activemq.ra*;resolution:=optional,
org.apache.geronimo.transaction.manager*;resolution:=optional,
- org.springframework*;resolution:=optional,
*
</activemq.osgi.import.pkg>
<activemq.osgi.export>
@@ -80,10 +79,6 @@
<artifactId>commons-pool</artifactId>
</dependency>
<dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-beans</artifactId>
- </dependency>
- <dependency>
<groupId>${pom.groupId}</groupId>
<artifactId>activemq-core</artifactId>
<type>test-jar</type>
@@ -95,25 +90,10 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-jms</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.xbean</groupId>
- <artifactId>xbean-spring</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.derby</groupId>
- <artifactId>derby</artifactId>
- <scope>test</scope>
- </dependency>
</dependencies>
</project>
Propchange: activemq/trunk/activemq-spring/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Apr 20 15:05:29 2010
@@ -0,0 +1,4 @@
+.classpath
+.settings
+.project
+target
Added: activemq/trunk/activemq-spring/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/pom.xml?rev=935952&view=auto
==============================================================================
--- activemq/trunk/activemq-spring/pom.xml (added)
+++ activemq/trunk/activemq-spring/pom.xml Tue Apr 20 15:05:29 2010
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-parent</artifactId>
+ <version>5.4-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>activemq-spring</artifactId>
+ <packaging>bundle</packaging>
+ <name>ActiveMQ :: Spring</name>
+ <description>ActiveMQ Spring Integration</description>
+
+ <properties>
+ <activemq.osgi.import.pkg>
+ javax.transaction*;resolution:=optional,
+ org.apache.geronimo.transaction.manager*;resolution:=optional,
+ org.springframework*;resolution:=optional,
+ *
+ </activemq.osgi.import.pkg>
+ <activemq.osgi.export>
+ org.apache.activemq.pool*;version=${project.version};-noimport:=;-split-package:=merge-last,
+ org.apache.activemq.xbean*;version=${project.version};-noimport:=true;-split-package:=merge-last
+ </activemq.osgi.export>
+ </properties>
+
+ <dependencies>
+
+ <!-- =============================== -->
+ <!-- Required Dependencies -->
+ <!-- =============================== -->
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${pom.groupId}</groupId>
+ <artifactId>activemq-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${pom.groupId}</groupId>
+ <artifactId>activemq-pool</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.components</groupId>
+ <artifactId>geronimo-transaction</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jta_1.0.1B_spec</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-annotation_1.0_spec</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>commons-pool</groupId>
+ <artifactId>commons-pool</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-beans</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${pom.groupId}</groupId>
+ <artifactId>activemq-core</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-jms</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.xbean</groupId>
+ <artifactId>xbean-spring</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
Propchange: activemq/trunk/activemq-spring/pom.xml
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/pool/PooledConnectionFactoryBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/pool/PooledConnectionFactoryBean.java?rev=935952&view=auto
==============================================================================
--- activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/pool/PooledConnectionFactoryBean.java (added)
+++ activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/pool/PooledConnectionFactoryBean.java Tue Apr 20 15:05:29 2010
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.pool;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.jms.ConnectionFactory;
+import javax.transaction.TransactionManager;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pool.ObjectPoolFactory;
+import org.springframework.beans.factory.FactoryBean;
+
+/**
+ * Simple factory bean used to create a jencks connection pool.
+ * Depending on the properties set, it will create a simple pool,
+ * a transaction aware connection pool, or a jca aware connection pool.
+ *
+ * <pre class="code">
+ * <bean id="pooledConnectionFactory" class="javax.script.ScriptEngineFactory.PooledConnectionFactoryFactoryBean">
+ * <property name="connectionFactory" ref="connectionFactory" />
+ * <property name="transactionManager" ref="transactionManager" />
+ * <property name="resourceName" value="ResourceName" />
+ * </bean>
+ * </pre>
+ *
+ * The <code>resourceName</code> property should be used along with the {@link ActiveMQResourceManager} and have
+ * the same value than its <code>resourceName</code> property. This will make sure the transaction manager
+ * maps correctly the connection factory to the recovery process.
+ *
+ * @org.apache.xbean.XBean
+ */
+public class PooledConnectionFactoryBean implements FactoryBean {
+
+ private static final Log LOGGER = LogFactory.getLog(PooledConnectionFactoryBean.class);
+
+ private PooledConnectionFactory pooledConnectionFactory;
+ private ConnectionFactory connectionFactory;
+ private int maxConnections = 1;
+ private int maximumActive = 500;
+ private Object transactionManager;
+ private String resourceName;
+ private ObjectPoolFactory poolFactory;
+
+ public int getMaxConnections() {
+ return maxConnections;
+ }
+
+ public void setMaxConnections(int maxConnections) {
+ this.maxConnections = maxConnections;
+ }
+
+ public int getMaximumActive() {
+ return maximumActive;
+ }
+
+ public void setMaximumActive(int maximumActive) {
+ this.maximumActive = maximumActive;
+ }
+
+ public Object getTransactionManager() {
+ return transactionManager;
+ }
+
+ public void setTransactionManager(Object transactionManager) {
+ this.transactionManager = transactionManager;
+ }
+
+ public String getResourceName() {
+ return resourceName;
+ }
+
+ public void setResourceName(String resourceName) {
+ this.resourceName = resourceName;
+ }
+
+ public ConnectionFactory getConnectionFactory() {
+ return connectionFactory;
+ }
+
+ public void setConnectionFactory(ConnectionFactory connectionFactory) {
+ this.connectionFactory = connectionFactory;
+ }
+
+ public ObjectPoolFactory getPoolFactory() {
+ return poolFactory;
+ }
+
+ public void setPoolFactory(ObjectPoolFactory poolFactory) {
+ this.poolFactory = poolFactory;
+ }
+
+ /**
+ *
+ * @throws Exception
+ * @org.apache.xbean.InitMethod
+ */
+ @PostConstruct
+ public void afterPropertiesSet() throws Exception {
+ if (pooledConnectionFactory == null && transactionManager != null && resourceName != null) {
+ try {
+ LOGGER.debug("Trying to build a JcaPooledConnectionFactory");
+ JcaPooledConnectionFactory f = new JcaPooledConnectionFactory();
+ f.setName(resourceName);
+ f.setTransactionManager((TransactionManager) transactionManager);
+ f.setMaxConnections(maxConnections);
+ f.setMaximumActive(maximumActive);
+ f.setConnectionFactory(connectionFactory);
+ f.setPoolFactory(poolFactory);
+ this.pooledConnectionFactory = f;
+ } catch (Throwable t) {
+ LOGGER.debug("Could not create JCA enabled connection factory: " + t, t);
+ }
+ }
+ if (pooledConnectionFactory == null && transactionManager != null) {
+ try {
+ LOGGER.debug("Trying to build a XaPooledConnectionFactory");
+ XaPooledConnectionFactory f = new XaPooledConnectionFactory();
+ f.setTransactionManager((TransactionManager) transactionManager);
+ f.setMaxConnections(maxConnections);
+ f.setMaximumActive(maximumActive);
+ f.setConnectionFactory(connectionFactory);
+ f.setPoolFactory(poolFactory);
+ this.pooledConnectionFactory = f;
+ } catch (Throwable t) {
+ LOGGER.debug("Could not create XA enabled connection factory: " + t, t);
+ }
+ }
+ if (pooledConnectionFactory == null) {
+ try {
+ LOGGER.debug("Trying to build a PooledConnectionFactory");
+ PooledConnectionFactory f = new PooledConnectionFactory();
+ f.setMaxConnections(maxConnections);
+ f.setMaximumActive(maximumActive);
+ f.setConnectionFactory(connectionFactory);
+ f.setPoolFactory(poolFactory);
+ this.pooledConnectionFactory = f;
+ } catch (Throwable t) {
+ LOGGER.debug("Could not create pooled connection factory: " + t, t);
+ }
+ }
+ if (pooledConnectionFactory == null) {
+ throw new IllegalStateException("Unable to create pooled connection factory. Enable DEBUG log level for more informations");
+ }
+ }
+
+ /**
+ *
+ * @throws Exception
+ * @org.apache.xbean.DestroyMethod
+ */
+ @PreDestroy
+ public void destroy() throws Exception {
+ if (pooledConnectionFactory != null) {
+ pooledConnectionFactory.stop();
+ pooledConnectionFactory = null;
+ }
+ }
+
+ // FactoryBean methods
+ public Object getObject() throws Exception {
+ return pooledConnectionFactory;
+ }
+
+ public Class getObjectType() {
+ return ConnectionFactory.class;
+ }
+
+ public boolean isSingleton() {
+ return true;
+ }
+}
Added: activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/xbean/PooledBrokerFactoryBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/xbean/PooledBrokerFactoryBean.java?rev=935952&view=auto
==============================================================================
--- activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/xbean/PooledBrokerFactoryBean.java (added)
+++ activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/xbean/PooledBrokerFactoryBean.java Tue Apr 20 15:05:29 2010
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.xbean;
+
+import java.util.HashMap;
+
+import org.apache.activemq.broker.BrokerService;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.FactoryBean;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.core.io.Resource;
+
+/**
+ * Used to share a single broker even if you have multiple broker bean
+ * definitions. A use case is where you have multiple web applications that want
+ * to start an embedded broker but only the first one to deploy should actually
+ * start it.
+ *
+ * @version $Revision$
+ */
+public class PooledBrokerFactoryBean implements FactoryBean, InitializingBean, DisposableBean {
+
+ static final HashMap<String, SharedBroker> SHARED_BROKER_MAP = new HashMap<String, SharedBroker>();
+
+ private boolean start;
+ private Resource config;
+
+ static class SharedBroker {
+ BrokerFactoryBean factory;
+ int refCount;
+ }
+
+ public void afterPropertiesSet() throws Exception {
+ synchronized (SHARED_BROKER_MAP) {
+ SharedBroker sharedBroker = SHARED_BROKER_MAP.get(config.getFilename());
+ if (sharedBroker == null) {
+ sharedBroker = new SharedBroker();
+ sharedBroker.factory = new BrokerFactoryBean();
+ sharedBroker.factory.setConfig(config);
+ sharedBroker.factory.setStart(start);
+ sharedBroker.factory.afterPropertiesSet();
+ SHARED_BROKER_MAP.put(config.getFilename(), sharedBroker);
+ }
+ sharedBroker.refCount++;
+ }
+ }
+
+ public void destroy() throws Exception {
+ synchronized (SHARED_BROKER_MAP) {
+ SharedBroker sharedBroker = SHARED_BROKER_MAP.get(config.getFilename());
+ if (sharedBroker != null) {
+ sharedBroker.refCount--;
+ if (sharedBroker.refCount == 0) {
+ sharedBroker.factory.destroy();
+ SHARED_BROKER_MAP.remove(config.getFilename());
+ }
+ }
+ }
+ }
+
+ public Resource getConfig() {
+ return config;
+ }
+
+ public Object getObject() throws Exception {
+ synchronized (SHARED_BROKER_MAP) {
+ SharedBroker sharedBroker = SHARED_BROKER_MAP.get(config.getFilename());
+ if (sharedBroker != null) {
+ return sharedBroker.factory.getObject();
+ }
+ }
+ return null;
+ }
+
+ public Class getObjectType() {
+ return BrokerService.class;
+ }
+
+ public boolean isSingleton() {
+ return true;
+ }
+
+ public boolean isStart() {
+ return start;
+ }
+
+ public void setConfig(Resource config) {
+ this.config = config;
+ }
+
+ public void setStart(boolean start) {
+ this.start = start;
+ }
+
+}
Added: activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQDeadlockTest3.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQDeadlockTest3.java?rev=935952&view=auto
==============================================================================
--- activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQDeadlockTest3.java (added)
+++ activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQDeadlockTest3.java Tue Apr 20 15:05:29 2010
@@ -0,0 +1,433 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import junit.framework.Assert;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+
+public class AMQDeadlockTest3 extends org.apache.activemq.test.TestSupport {
+
+ private static final transient Log LOG = LogFactory.getLog(AMQDeadlockTest3.class);
+
+ private static final String URL1 = "tcp://localhost:61616";
+
+ private static final String URL2 = "tcp://localhost:61617";
+
+ private static final String QUEUE1_NAME = "test.queue.1";
+
+ private static final String QUEUE2_NAME = "test.queue.2";
+
+ private static final int MAX_CONSUMERS = 1;
+
+ private static final int MAX_PRODUCERS = 1;
+
+ private static final int NUM_MESSAGE_TO_SEND = 10;
+
+ private AtomicInteger messageCount = new AtomicInteger();
+ private CountDownLatch doneLatch;
+
+ public void setUp() throws Exception {
+ }
+
+ public void tearDown() throws Exception {
+ }
+
+ // This should fail with incubator-activemq-fuse-4.1.0.5
+ public void testQueueLimitsWithOneBrokerSameConnection() throws Exception {
+
+ BrokerService brokerService1 = null;
+ ActiveMQConnectionFactory acf = null;
+ PooledConnectionFactory pcf = null;
+ DefaultMessageListenerContainer container1 = null;
+
+ try {
+ brokerService1 = createBrokerService("broker1", URL1, null);
+ brokerService1.start();
+
+ acf = createConnectionFactory(URL1);
+ pcf = new PooledConnectionFactory(acf);
+
+ // Only listen on the first queue.. let the 2nd queue fill up.
+ doneLatch = new CountDownLatch(NUM_MESSAGE_TO_SEND);
+ container1 = createDefaultMessageListenerContainer(acf, new TestMessageListener1(500), QUEUE1_NAME);
+ container1.afterPropertiesSet();
+
+ Thread.sleep(2000);
+
+ final ExecutorService executor = Executors.newCachedThreadPool();
+ for (int i = 0; i < MAX_PRODUCERS; i++) {
+ executor.submit(new PooledProducerTask(pcf, QUEUE2_NAME));
+ Thread.sleep(1000);
+ executor.submit(new PooledProducerTask(pcf, QUEUE1_NAME));
+ }
+
+ // Wait for all message to arrive.
+ assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
+ executor.shutdownNow();
+
+ Assert.assertEquals(NUM_MESSAGE_TO_SEND, messageCount.get());
+
+ } finally {
+
+ container1.stop();
+ container1.destroy();
+ container1 = null;
+ brokerService1.stop();
+ brokerService1 = null;
+
+ }
+
+ }
+
+ // This should fail with incubator-activemq-fuse-4.1.0.5
+ public void testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithOneConnectionForProducing() throws Exception {
+
+ BrokerService brokerService1 = null;
+ BrokerService brokerService2 = null;
+ ActiveMQConnectionFactory acf1 = null;
+ ActiveMQConnectionFactory acf2 = null;
+ PooledConnectionFactory pcf = null;
+ DefaultMessageListenerContainer container1 = null;
+
+ try {
+ brokerService1 = createBrokerService("broker1", URL1, URL2);
+ brokerService1.start();
+ brokerService2 = createBrokerService("broker2", URL2, URL1);
+ brokerService2.start();
+
+ acf1 = createConnectionFactory(URL1);
+ acf2 = createConnectionFactory(URL2);
+
+ pcf = new PooledConnectionFactory(acf1);
+
+ Thread.sleep(1000);
+
+ doneLatch = new CountDownLatch(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND);
+ container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500), QUEUE1_NAME);
+ container1.afterPropertiesSet();
+
+ final ExecutorService executor = Executors.newCachedThreadPool();
+ for (int i = 0; i < MAX_PRODUCERS; i++) {
+ executor.submit(new PooledProducerTask(pcf, QUEUE2_NAME));
+ Thread.sleep(1000);
+ executor.submit(new PooledProducerTask(pcf, QUEUE1_NAME));
+ }
+
+ assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
+ executor.shutdownNow();
+
+ Assert.assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND, messageCount.get());
+ } finally {
+
+ container1.stop();
+ container1.destroy();
+ container1 = null;
+
+ brokerService1.stop();
+ brokerService1 = null;
+ brokerService2.stop();
+ brokerService2 = null;
+ }
+ }
+
+ // This should fail with incubator-activemq-fuse-4.1.0.5
+ public void testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithSeperateConnectionsForProducing() throws Exception {
+
+ BrokerService brokerService1 = null;
+ BrokerService brokerService2 = null;
+ ActiveMQConnectionFactory acf1 = null;
+ ActiveMQConnectionFactory acf2 = null;
+ DefaultMessageListenerContainer container1 = null;
+ DefaultMessageListenerContainer container2 = null;
+
+ try {
+ brokerService1 = createBrokerService("broker1", URL1, URL2);
+ brokerService1.start();
+ brokerService2 = createBrokerService("broker2", URL2, URL1);
+ brokerService2.start();
+
+ acf1 = createConnectionFactory(URL1);
+ acf2 = createConnectionFactory(URL2);
+
+ Thread.sleep(1000);
+
+ doneLatch = new CountDownLatch(NUM_MESSAGE_TO_SEND * MAX_PRODUCERS);
+
+ container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500), QUEUE1_NAME);
+ container1.afterPropertiesSet();
+ container2 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(30000), QUEUE2_NAME);
+ container2.afterPropertiesSet();
+
+ final ExecutorService executor = Executors.newCachedThreadPool();
+ for (int i = 0; i < MAX_PRODUCERS; i++) {
+ executor.submit(new NonPooledProducerTask(acf1, QUEUE2_NAME));
+ Thread.sleep(1000);
+ executor.submit(new NonPooledProducerTask(acf1, QUEUE1_NAME));
+ }
+
+ assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
+ executor.shutdownNow();
+
+ Assert.assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND, messageCount.get());
+ } finally {
+
+ container1.stop();
+ container1.destroy();
+ container1 = null;
+
+ container2.stop();
+ container2.destroy();
+ container2 = null;
+
+ brokerService1.stop();
+ brokerService1 = null;
+ brokerService2.stop();
+ brokerService2 = null;
+ }
+ }
+
+ private BrokerService createBrokerService(final String brokerName, final String uri1, final String uri2) throws Exception {
+ final BrokerService brokerService = new BrokerService();
+
+ brokerService.setBrokerName(brokerName);
+ brokerService.setPersistent(false);
+ brokerService.setUseJmx(true);
+
+ final SystemUsage memoryManager = new SystemUsage();
+ memoryManager.getMemoryUsage().setLimit(5000000);
+ brokerService.setSystemUsage(memoryManager);
+
+ final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
+
+ final PolicyEntry entry = new PolicyEntry();
+ entry.setQueue(">");
+ // entry.setQueue(QUEUE1_NAME);
+ entry.setMemoryLimit(1000);
+ policyEntries.add(entry);
+
+ final PolicyMap policyMap = new PolicyMap();
+ policyMap.setPolicyEntries(policyEntries);
+ brokerService.setDestinationPolicy(policyMap);
+
+ final TransportConnector tConnector = new TransportConnector();
+ tConnector.setUri(new URI(uri1));
+ tConnector.setName(brokerName + ".transportConnector");
+ brokerService.addConnector(tConnector);
+
+ if (uri2 != null) {
+ final NetworkConnector nc = new DiscoveryNetworkConnector(new URI("static:" + uri2));
+ nc.setBridgeTempDestinations(true);
+ nc.setBrokerName(brokerName);
+ brokerService.addNetworkConnector(nc);
+ }
+
+ return brokerService;
+
+ }
+
+ public DefaultMessageListenerContainer createDefaultMessageListenerContainer(final ConnectionFactory acf, final MessageListener listener, final String queue) {
+ final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
+ container.setConnectionFactory(acf);
+ container.setDestinationName(queue);
+ container.setMessageListener(listener);
+ container.setSessionTransacted(false);
+ container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
+ container.setConcurrentConsumers(MAX_CONSUMERS);
+ return container;
+ }
+
+ public ActiveMQConnectionFactory createConnectionFactory(final String url) {
+ final ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(url);
+ acf.setCopyMessageOnSend(false);
+ acf.setUseAsyncSend(false);
+ acf.setDispatchAsync(true);
+ acf.setUseCompression(false);
+ acf.setOptimizeAcknowledge(false);
+ acf.setOptimizedMessageDispatch(true);
+ acf.setAlwaysSyncSend(true);
+ return acf;
+ }
+
+ private class TestMessageListener1 implements MessageListener {
+
+ private final long waitTime;
+
+ public TestMessageListener1(long waitTime) {
+ this.waitTime = waitTime;
+
+ }
+
+ public void onMessage(Message msg) {
+
+ try {
+ LOG.info("Listener1 Consumed message " + msg.getIntProperty("count"));
+
+ messageCount.incrementAndGet();
+ doneLatch.countDown();
+
+ Thread.sleep(waitTime);
+ } catch (JMSException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ }
+ }
+
+ private static class PooledProducerTask implements Runnable {
+
+ private final String queueName;
+
+ private final PooledConnectionFactory pcf;
+
+ public PooledProducerTask(final PooledConnectionFactory pcf, final String queueName) {
+ this.pcf = pcf;
+ this.queueName = queueName;
+ }
+
+ public void run() {
+
+ try {
+
+ final JmsTemplate jmsTemplate = new JmsTemplate(pcf);
+ jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ jmsTemplate.setExplicitQosEnabled(true);
+ jmsTemplate.setMessageIdEnabled(false);
+ jmsTemplate.setMessageTimestampEnabled(false);
+ jmsTemplate.afterPropertiesSet();
+
+ final byte[] bytes = new byte[2048];
+ final Random r = new Random();
+ r.nextBytes(bytes);
+
+ Thread.sleep(2000);
+
+ final AtomicInteger count = new AtomicInteger();
+ for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
+ jmsTemplate.send(queueName, new MessageCreator() {
+
+ public Message createMessage(Session session) throws JMSException {
+
+ final BytesMessage message = session.createBytesMessage();
+
+ message.writeBytes(bytes);
+ message.setIntProperty("count", count.incrementAndGet());
+ message.setStringProperty("producer", "pooled");
+ return message;
+ }
+ });
+
+ LOG.info("PooledProducer sent message: " + count.get());
+ // Thread.sleep(1000);
+ }
+
+ } catch (final Throwable e) {
+ LOG.error("Producer 1 is exiting", e);
+ }
+ }
+ }
+
+ private static class NonPooledProducerTask implements Runnable {
+
+ private final String queueName;
+
+ private final ConnectionFactory cf;
+
+ public NonPooledProducerTask(final ConnectionFactory cf, final String queueName) {
+ this.cf = cf;
+ this.queueName = queueName;
+ }
+
+ public void run() {
+
+ try {
+
+ final JmsTemplate jmsTemplate = new JmsTemplate(cf);
+ jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ jmsTemplate.setExplicitQosEnabled(true);
+ jmsTemplate.setMessageIdEnabled(false);
+ jmsTemplate.setMessageTimestampEnabled(false);
+ jmsTemplate.afterPropertiesSet();
+
+ final byte[] bytes = new byte[2048];
+ final Random r = new Random();
+ r.nextBytes(bytes);
+
+ Thread.sleep(2000);
+
+ final AtomicInteger count = new AtomicInteger();
+ for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
+ jmsTemplate.send(queueName, new MessageCreator() {
+
+ public Message createMessage(Session session) throws JMSException {
+
+ final BytesMessage message = session.createBytesMessage();
+
+ message.writeBytes(bytes);
+ message.setIntProperty("count", count.incrementAndGet());
+ message.setStringProperty("producer", "non-pooled");
+ return message;
+ }
+ });
+
+ LOG.info("Non-PooledProducer sent message: " + count.get());
+
+ // Thread.sleep(1000);
+ }
+
+ } catch (final Throwable e) {
+ LOG.error("Producer 1 is exiting", e);
+ }
+ }
+ }
+
+}
Added: activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQDeadlockTestW4Brokers.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQDeadlockTestW4Brokers.java?rev=935952&view=auto
==============================================================================
--- activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQDeadlockTestW4Brokers.java (added)
+++ activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQDeadlockTestW4Brokers.java Tue Apr 20 15:05:29 2010
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.test.*;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+
+public class AMQDeadlockTestW4Brokers extends org.apache.activemq.test.TestSupport {
+
+ private static final transient Log LOG = LogFactory.getLog(AMQDeadlockTestW4Brokers.class);
+
+ private static final String BROKER_URL1 = "tcp://localhost:61616";
+ private static final String BROKER_URL2 = "tcp://localhost:61617";
+ private static final String BROKER_URL3 = "tcp://localhost:61618";
+ private static final String BROKER_URL4 = "tcp://localhost:61619";
+ private static final String URL1 = "tcp://localhost:61616?wireFormat.cacheEnabled=false"
+ + "&wireFormat.tightEncodingEnabled=false&wireFormat.maxInactivityDuration=30000&wireFormat.tcpNoDelayEnabled=false";
+ private static final String URL2 = "tcp://localhost:61617?wireFormat.cacheEnabled=false"
+ + "&wireFormat.tightEncodingEnabled=false&wireFormat.maxInactivityDuration=30000&wireFormat.tcpNoDelayEnabled=false";
+ private static final String URL3 = "tcp://localhost:61618?wireFormat.cacheEnabled=false"
+ + "&wireFormat.tightEncodingEnabled=false&wireFormat.maxInactivityDuration=30000&wireFormat.tcpNoDelayEnabled=false";
+ private static final String URL4 = "tcp://localhost:61619?wireFormat.cacheEnabled=false"
+ + "&wireFormat.tightEncodingEnabled=false&wireFormat.maxInactivityDuration=30000&wireFormat.tcpNoDelayEnabled=false";
+ private static final String QUEUE1_NAME = "test.queue.1";
+ private static final int MAX_CONSUMERS = 5;
+ private static final int NUM_MESSAGE_TO_SEND = 10000;
+ private static final CountDownLatch LATCH = new CountDownLatch(MAX_CONSUMERS * NUM_MESSAGE_TO_SEND);
+
+ @Override
+ public void setUp() throws Exception {
+
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+
+ }
+
+ public void test4BrokerWithOutLingo() throws Exception {
+
+ BrokerService brokerService1 = null;
+ BrokerService brokerService2 = null;
+ BrokerService brokerService3 = null;
+ BrokerService brokerService4 = null;
+ ActiveMQConnectionFactory acf1 = null;
+ ActiveMQConnectionFactory acf2 = null;
+ PooledConnectionFactory pcf1 = null;
+ PooledConnectionFactory pcf2 = null;
+ ActiveMQConnectionFactory acf3 = null;
+ ActiveMQConnectionFactory acf4 = null;
+ PooledConnectionFactory pcf3 = null;
+ PooledConnectionFactory pcf4 = null;
+ DefaultMessageListenerContainer container1 = null;
+
+ try {
+
+ // Test with and without queue limits.
+ brokerService1 = createBrokerService("broker1", BROKER_URL1, BROKER_URL2, BROKER_URL3, BROKER_URL4, 0 /* 10000000 */);
+ brokerService1.start();
+ brokerService2 = createBrokerService("broker2", BROKER_URL2, BROKER_URL1, BROKER_URL3, BROKER_URL4, 0/* 40000000 */);
+ brokerService2.start();
+ brokerService3 = createBrokerService("broker3", BROKER_URL3, BROKER_URL2, BROKER_URL1, BROKER_URL4, 0/* 10000000 */);
+ brokerService3.start();
+ brokerService4 = createBrokerService("broker4", BROKER_URL4, BROKER_URL1, BROKER_URL3, BROKER_URL2, 0/* 10000000 */);
+ brokerService4.start();
+
+ final String failover1 = "failover:(" + URL1
+ + ")?initialReconnectDelay=10&maxReconnectDelay=30000&useExponentialBackOff=true&backOffMultiplier=2&maxReconnectAttempts=0&randomize=false";
+ final String failover2 = "failover:(" + URL2
+ + ")?initialReconnectDelay=10&maxReconnectDelay=30000&useExponentialBackOff=true&backOffMultiplier=2&maxReconnectAttempts=0&randomize=false";
+
+ final String failover3 = "failover:(" + URL3
+ + ")?initialReconnectDelay=10&maxReconnectDelay=30000&useExponentialBackOff=true&backOffMultiplier=2&maxReconnectAttempts=0&randomize=false";
+
+ final String failover4 = "failover:(" + URL4
+ + ")?initialReconnectDelay=10&maxReconnectDelay=30000&useExponentialBackOff=true&backOffMultiplier=2&maxReconnectAttempts=0&randomize=false";
+
+ acf1 = createConnectionFactory(failover1);
+ acf2 = createConnectionFactory(failover2);
+ acf3 = createConnectionFactory(failover3);
+ acf4 = createConnectionFactory(failover4);
+
+ pcf1 = new PooledConnectionFactory(acf1);
+ pcf2 = new PooledConnectionFactory(acf2);
+ pcf3 = new PooledConnectionFactory(acf3);
+ pcf4 = new PooledConnectionFactory(acf4);
+
+ container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(0), QUEUE1_NAME);
+ container1.afterPropertiesSet();
+
+ final PooledProducerTask[] task = new PooledProducerTask[4];
+ task[0] = new PooledProducerTask(pcf1, QUEUE1_NAME, "producer1");
+ task[1] = new PooledProducerTask(pcf2, QUEUE1_NAME, "producer2");
+ task[2] = new PooledProducerTask(pcf3, QUEUE1_NAME, "producer3");
+ task[3] = new PooledProducerTask(pcf4, QUEUE1_NAME, "producer4");
+
+ final ExecutorService executor = Executors.newCachedThreadPool();
+
+ for (int i = 0; i < 4; i++) {
+ executor.submit(task[i]);
+ }
+
+ LATCH.await(15, TimeUnit.SECONDS);
+ assertTrue(LATCH.getCount() == MAX_CONSUMERS * NUM_MESSAGE_TO_SEND);
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+
+ container1.stop();
+ container1.destroy();
+ container1 = null;
+
+ brokerService1.stop();
+ brokerService1 = null;
+ brokerService2.stop();
+ brokerService2 = null;
+ brokerService3.stop();
+ brokerService3 = null;
+ brokerService4.stop();
+ brokerService4 = null;
+ }
+ }
+
+ private BrokerService createBrokerService(final String brokerName, final String uri1, final String uri2, final String uri3, final String uri4, final int queueLimit)
+ throws Exception {
+ final BrokerService brokerService = new BrokerService();
+
+ brokerService.setBrokerName(brokerName);
+ brokerService.setPersistent(false);
+ brokerService.setUseJmx(true);
+
+ final SystemUsage memoryManager = new SystemUsage();
+ memoryManager.getMemoryUsage().setLimit(100000000);
+ brokerService.setSystemUsage(memoryManager);
+
+ final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
+
+ final PolicyEntry entry = new PolicyEntry();
+ entry.setQueue(">");
+ entry.setMemoryLimit(queueLimit);
+ policyEntries.add(entry);
+
+ final PolicyMap policyMap = new PolicyMap();
+ policyMap.setPolicyEntries(policyEntries);
+ brokerService.setDestinationPolicy(policyMap);
+
+ final TransportConnector tConnector = new TransportConnector();
+ tConnector.setUri(new URI(uri1));
+ tConnector.setName(brokerName + ".transportConnector");
+ brokerService.addConnector(tConnector);
+
+ if (uri2 != null) {
+ final NetworkConnector nc = new DiscoveryNetworkConnector(new URI("static:" + uri2 + "," + uri3 + "," + uri4));
+ nc.setBridgeTempDestinations(true);
+ nc.setBrokerName(brokerName);
+
+ // When using queue limits set this to 1
+ nc.setPrefetchSize(1000);
+ nc.setNetworkTTL(1);
+ brokerService.addNetworkConnector(nc);
+ }
+
+ return brokerService;
+ }
+
+ public DefaultMessageListenerContainer createDefaultMessageListenerContainer(final ConnectionFactory acf, final MessageListener listener, final String queue) {
+ final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
+ container.setConnectionFactory(acf);
+ container.setDestinationName(queue);
+ container.setMessageListener(listener);
+ container.setSessionTransacted(false);
+ container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
+ container.setConcurrentConsumers(MAX_CONSUMERS);
+ return container;
+ }
+
+ public ActiveMQConnectionFactory createConnectionFactory(final String url) {
+ final ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(url);
+ acf.setCopyMessageOnSend(false);
+ acf.setUseAsyncSend(false);
+ acf.setDispatchAsync(true);
+ acf.setUseCompression(false);
+ acf.setOptimizeAcknowledge(false);
+ acf.setOptimizedMessageDispatch(true);
+ acf.setUseAsyncSend(false);
+
+ return acf;
+ }
+
+ private static class TestMessageListener1 implements MessageListener {
+ final AtomicInteger count = new AtomicInteger(0);
+ private final long waitTime;
+
+ public TestMessageListener1(long waitTime) {
+ this.waitTime = waitTime;
+ }
+
+ public void onMessage(Message msg) {
+
+ try {
+ /*
+ * log.info("Listener1 Consumed message " +
+ * msg.getIntProperty("count") + " from " +
+ * msg.getStringProperty("producerName"));
+ */
+ int value = count.incrementAndGet();
+ if (value % 1000 == 0) {
+ LOG.info("Consumed message: " + value);
+ }
+
+ Thread.sleep(waitTime);
+ LATCH.countDown();
+ /*
+ * } catch (JMSException e) { e.printStackTrace();
+ */
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private class PooledProducerTask implements Runnable {
+ private final String queueName;
+ private final PooledConnectionFactory pcf;
+ private final String producerName;
+
+ public PooledProducerTask(final PooledConnectionFactory pcf, final String queueName, final String producerName) {
+ this.pcf = pcf;
+ this.queueName = queueName;
+ this.producerName = producerName;
+ }
+
+ public void run() {
+
+ try {
+
+ final JmsTemplate jmsTemplate = new JmsTemplate(pcf);
+ jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ jmsTemplate.setExplicitQosEnabled(true);
+ jmsTemplate.setMessageIdEnabled(false);
+ jmsTemplate.setMessageTimestampEnabled(false);
+ jmsTemplate.afterPropertiesSet();
+
+ final byte[] bytes = new byte[2048];
+ final Random r = new Random();
+ r.nextBytes(bytes);
+
+ for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
+ final int count = i;
+ jmsTemplate.send(queueName, new MessageCreator() {
+ public Message createMessage(Session session) throws JMSException {
+
+ final BytesMessage message = session.createBytesMessage();
+
+ message.writeBytes(bytes);
+ message.setIntProperty("count", count);
+ message.setStringProperty("producerName", producerName);
+ return message;
+ }
+ });
+
+ // log.info("PooledProducer " + producerName + " sent
+ // message: " + count);
+
+ // Thread.sleep(1000);
+ }
+ } catch (final Throwable e) {
+ LOG.error("Producer 1 is exiting", e);
+ }
+ }
+ }
+}
Added: activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java?rev=935952&view=auto
==============================================================================
--- activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java (added)
+++ activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java Tue Apr 20 15:05:29 2010
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import junit.framework.Assert;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.test.*;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.apache.activemq.usage.SystemUsage;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+
+public class AMQFailoverIssue extends org.apache.activemq.test.TestSupport {
+
+ private static final String URL1 = "tcp://localhost:61616";
+ private static final String QUEUE1_NAME = "test.queue.1";
+ private static final int MAX_CONSUMERS = 10;
+ private static final int MAX_PRODUCERS = 5;
+ private static final int NUM_MESSAGE_TO_SEND = 10000;
+ private static final int TOTAL_MESSAGES = MAX_PRODUCERS * NUM_MESSAGE_TO_SEND;
+ private static final boolean USE_FAILOVER = true;
+ private AtomicInteger messageCount = new AtomicInteger();
+ private CountDownLatch doneLatch;
+
+ @Override
+ public void setUp() throws Exception {
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ }
+
+ // This should fail with incubator-activemq-fuse-4.1.0.5
+ public void testFailoverIssue() throws Exception {
+ BrokerService brokerService1 = null;
+ ActiveMQConnectionFactory acf;
+ PooledConnectionFactory pcf;
+ DefaultMessageListenerContainer container1 = null;
+ try {
+ brokerService1 = createBrokerService("broker1", URL1, null);
+ brokerService1.start();
+ acf = createConnectionFactory(URL1, USE_FAILOVER);
+ pcf = new PooledConnectionFactory(acf);
+ // Only listen on the first queue.. let the 2nd queue fill up.
+ doneLatch = new CountDownLatch(TOTAL_MESSAGES);
+ container1 = createDefaultMessageListenerContainer(acf, new TestMessageListener1(0), QUEUE1_NAME);
+ container1.afterPropertiesSet();
+ Thread.sleep(5000);
+ final ExecutorService executor = Executors.newCachedThreadPool();
+ for (int i = 0; i < MAX_PRODUCERS; i++) {
+ executor.submit(new PooledProducerTask(pcf, QUEUE1_NAME));
+ }
+ // Wait for all message to arrive.
+ assertTrue(doneLatch.await(45, TimeUnit.SECONDS));
+ executor.shutdown();
+ // Thread.sleep(30000);
+ Assert.assertEquals(TOTAL_MESSAGES, messageCount.get());
+ } finally {
+ container1.stop();
+ container1.destroy();
+ container1 = null;
+ brokerService1.stop();
+ brokerService1 = null;
+ }
+ }
+
+ private BrokerService createBrokerService(final String brokerName, final String uri1, final String uri2) throws Exception {
+ final BrokerService brokerService = new BrokerService();
+ brokerService.setBrokerName(brokerName);
+ brokerService.setPersistent(false);
+ brokerService.setUseJmx(true);
+ final SystemUsage memoryManager = new SystemUsage();
+ memoryManager.getMemoryUsage().setLimit(5000000);
+ brokerService.setSystemUsage(memoryManager);
+ final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
+ final PolicyEntry entry = new PolicyEntry();
+ entry.setQueue(">");
+ // entry.setQueue(QUEUE1_NAME);
+ entry.setMemoryLimit(1);
+ policyEntries.add(entry);
+ final PolicyMap policyMap = new PolicyMap();
+ policyMap.setPolicyEntries(policyEntries);
+ brokerService.setDestinationPolicy(policyMap);
+ final TransportConnector tConnector = new TransportConnector();
+ tConnector.setUri(new URI(uri1));
+ tConnector.setName(brokerName + ".transportConnector");
+ brokerService.addConnector(tConnector);
+ if (uri2 != null) {
+ final NetworkConnector nc = new DiscoveryNetworkConnector(new URI("static:" + uri2));
+ nc.setBridgeTempDestinations(true);
+ nc.setBrokerName(brokerName);
+ nc.setPrefetchSize(1);
+ brokerService.addNetworkConnector(nc);
+ }
+ return brokerService;
+ }
+
+ public DefaultMessageListenerContainer createDefaultMessageListenerContainer(final ConnectionFactory acf, final MessageListener listener, final String queue) {
+ final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
+ container.setConnectionFactory(acf);
+ container.setDestinationName(queue);
+ container.setMessageListener(listener);
+ container.setSessionTransacted(false);
+ container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
+ container.setConcurrentConsumers(MAX_CONSUMERS);
+ return container;
+ }
+
+ public ActiveMQConnectionFactory createConnectionFactory(final String url, final boolean useFailover) {
+ final String failoverUrl = "failover:(" + url + ")";
+ final ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(useFailover ? failoverUrl : url);
+ acf.setCopyMessageOnSend(false);
+ acf.setUseAsyncSend(false);
+ acf.setDispatchAsync(true);
+ acf.setUseCompression(false);
+ acf.setOptimizeAcknowledge(false);
+ acf.setOptimizedMessageDispatch(true);
+ acf.setUseAsyncSend(false);
+ return acf;
+ }
+
+ private class TestMessageListener1 implements MessageListener {
+
+ private final long waitTime;
+
+ public TestMessageListener1(long waitTime) {
+ this.waitTime = waitTime;
+ }
+
+ public void onMessage(Message msg) {
+ try {
+ messageCount.incrementAndGet();
+ doneLatch.countDown();
+ Thread.sleep(waitTime);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private static class PooledProducerTask implements Runnable {
+
+ private final String queueName;
+ private final PooledConnectionFactory pcf;
+
+ public PooledProducerTask(final PooledConnectionFactory pcf, final String queueName) {
+ this.pcf = pcf;
+ this.queueName = queueName;
+ }
+
+ public void run() {
+ try {
+ final JmsTemplate jmsTemplate = new JmsTemplate(pcf);
+ jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ jmsTemplate.setExplicitQosEnabled(true);
+ jmsTemplate.setMessageIdEnabled(false);
+ jmsTemplate.setMessageTimestampEnabled(false);
+ jmsTemplate.afterPropertiesSet();
+ final byte[] bytes = new byte[2048];
+ final Random r = new Random();
+ r.nextBytes(bytes);
+ Thread.sleep(2000);
+ final AtomicInteger count = new AtomicInteger();
+ for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
+ jmsTemplate.send(queueName, new MessageCreator() {
+
+ public Message createMessage(Session session) throws JMSException {
+ final BytesMessage message = session.createBytesMessage();
+ message.writeBytes(bytes);
+ message.setIntProperty("count", count.incrementAndGet());
+ message.setStringProperty("producer", "pooled");
+ return message;
+ }
+ });
+ }
+ } catch (final Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
Modified: activemq/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/pom.xml?rev=935952&r1=935951&r2=935952&view=diff
==============================================================================
--- activemq/trunk/pom.xml (original)
+++ activemq/trunk/pom.xml Tue Apr 20 15:05:29 2010
@@ -142,6 +142,7 @@
<module>activemq-ra</module>
<module>activemq-rar</module>
<module>activemq-run</module>
+ <module>activemq-spring</module>
<module>activemq-tooling</module>
<module>activemq-web</module>
<module>activemq-web-demo</module>