You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2010/04/20 17:05:30 UTC

svn commit: r935952 - in /activemq/trunk: ./ activemq-core/src/main/java/org/apache/activemq/xbean/ activemq-pool/ activemq-pool/src/main/java/org/apache/activemq/pool/ activemq-pool/src/test/java/org/apache/activemq/usecases/ activemq-spring/ activemq...

Author: dejanb
Date: Tue Apr 20 15:05:29 2010
New Revision: 935952

URL: http://svn.apache.org/viewvc?rev=935952&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2702 - introducing activemq-spring module

Added:
    activemq/trunk/activemq-spring/   (with props)
    activemq/trunk/activemq-spring/pom.xml   (with props)
    activemq/trunk/activemq-spring/src/
    activemq/trunk/activemq-spring/src/main/
    activemq/trunk/activemq-spring/src/main/java/
    activemq/trunk/activemq-spring/src/main/java/org/
    activemq/trunk/activemq-spring/src/main/java/org/apache/
    activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/
    activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/pool/
    activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/pool/PooledConnectionFactoryBean.java
    activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/spring/
    activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/xbean/
    activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/xbean/PooledBrokerFactoryBean.java
    activemq/trunk/activemq-spring/src/main/resources/
    activemq/trunk/activemq-spring/src/test/
    activemq/trunk/activemq-spring/src/test/java/
    activemq/trunk/activemq-spring/src/test/java/org/
    activemq/trunk/activemq-spring/src/test/java/org/apache/
    activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/
    activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/
    activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQDeadlockTest3.java
    activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQDeadlockTestW4Brokers.java
    activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java
    activemq/trunk/activemq-spring/src/test/resources/
Removed:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/xbean/PooledBrokerFactoryBean.java
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactoryBean.java
    activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/
Modified:
    activemq/trunk/activemq-pool/pom.xml
    activemq/trunk/pom.xml

Modified: activemq/trunk/activemq-pool/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/pom.xml?rev=935952&r1=935951&r2=935952&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/pom.xml (original)
+++ activemq/trunk/activemq-pool/pom.xml Tue Apr 20 15:05:29 2010
@@ -35,7 +35,6 @@
       javax.transaction*;resolution:=optional,
       org.apache.activemq.ra*;resolution:=optional,
       org.apache.geronimo.transaction.manager*;resolution:=optional,
-      org.springframework*;resolution:=optional,
       *
     </activemq.osgi.import.pkg>
     <activemq.osgi.export>
@@ -80,10 +79,6 @@
       <artifactId>commons-pool</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.springframework</groupId>
-      <artifactId>spring-beans</artifactId>
-    </dependency>
-    <dependency>
       <groupId>${pom.groupId}</groupId>
       <artifactId>activemq-core</artifactId>
       <type>test-jar</type>
@@ -95,25 +90,10 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>org.springframework</groupId>
-      <artifactId>spring-jms</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-    	<groupId>org.apache.xbean</groupId>
-    	<artifactId>xbean-spring</artifactId>
-    	<scope>test</scope>
-    </dependency>
-    <dependency>
     	<groupId>log4j</groupId>
     	<artifactId>log4j</artifactId>
     	<scope>test</scope>
     </dependency>
-    <dependency>
-    	<groupId>org.apache.derby</groupId>
-    	<artifactId>derby</artifactId>
-    	<scope>test</scope>
-    </dependency>
   </dependencies>
 
 </project>

Propchange: activemq/trunk/activemq-spring/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Apr 20 15:05:29 2010
@@ -0,0 +1,4 @@
+.classpath
+.settings
+.project
+target

Added: activemq/trunk/activemq-spring/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/pom.xml?rev=935952&view=auto
==============================================================================
--- activemq/trunk/activemq-spring/pom.xml (added)
+++ activemq/trunk/activemq-spring/pom.xml Tue Apr 20 15:05:29 2010
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.activemq</groupId>
+    <artifactId>activemq-parent</artifactId>
+    <version>5.4-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>activemq-spring</artifactId>
+  <packaging>bundle</packaging>
+  <name>ActiveMQ :: Spring</name>
+  <description>ActiveMQ Spring Integration</description>
+
+  <properties>
+    <activemq.osgi.import.pkg>
+      javax.transaction*;resolution:=optional,
+      org.apache.geronimo.transaction.manager*;resolution:=optional,
+      org.springframework*;resolution:=optional,
+      *
+    </activemq.osgi.import.pkg>
+    <activemq.osgi.export>
+      org.apache.activemq.pool*;version=${project.version};-noimport:=;-split-package:=merge-last,
+      org.apache.activemq.xbean*;version=${project.version};-noimport:=true;-split-package:=merge-last
+    </activemq.osgi.export>
+  </properties>
+
+  <dependencies>
+
+    <!-- =============================== -->
+    <!-- Required Dependencies -->
+    <!-- =============================== -->
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>${pom.groupId}</groupId>
+      <artifactId>activemq-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>${pom.groupId}</groupId>
+      <artifactId>activemq-pool</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.geronimo.components</groupId>
+      <artifactId>geronimo-transaction</artifactId>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.geronimo.specs</groupId>
+      <artifactId>geronimo-jta_1.0.1B_spec</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.geronimo.specs</groupId>
+      <artifactId>geronimo-annotation_1.0_spec</artifactId>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>commons-pool</groupId>
+      <artifactId>commons-pool</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework</groupId>
+      <artifactId>spring-beans</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>${pom.groupId}</groupId>
+      <artifactId>activemq-core</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework</groupId>
+      <artifactId>spring-jms</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+    	<groupId>org.apache.xbean</groupId>
+    	<artifactId>xbean-spring</artifactId>
+    	<scope>test</scope>
+    </dependency>
+    <dependency>
+    	<groupId>log4j</groupId>
+    	<artifactId>log4j</artifactId>
+    	<scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>

Propchange: activemq/trunk/activemq-spring/pom.xml
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/pool/PooledConnectionFactoryBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/pool/PooledConnectionFactoryBean.java?rev=935952&view=auto
==============================================================================
--- activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/pool/PooledConnectionFactoryBean.java (added)
+++ activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/pool/PooledConnectionFactoryBean.java Tue Apr 20 15:05:29 2010
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.pool;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.jms.ConnectionFactory;
+import javax.transaction.TransactionManager;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pool.ObjectPoolFactory;
+import org.springframework.beans.factory.FactoryBean;
+
+/**
+ * Simple factory bean used to create a jencks connection pool.
+ * Depending on the properties set, it will create a simple pool,
+ * a transaction aware connection pool, or a jca aware connection pool.
+ *
+ * <pre class="code">
+ * <bean id="pooledConnectionFactory" class="javax.script.ScriptEngineFactory.PooledConnectionFactoryFactoryBean">
+ *   <property name="connectionFactory" ref="connectionFactory" />
+ *   <property name="transactionManager" ref="transactionManager" />
+ *   <property name="resourceName" value="ResourceName" />
+ * </bean>
+ * </pre>
+ *
+ * The <code>resourceName</code> property should be used along with the {@link ActiveMQResourceManager} and have
+ * the same value than its <code>resourceName</code> property. This will make sure the transaction manager
+ * maps correctly the connection factory to the recovery process.
+ *
+ * @org.apache.xbean.XBean
+ */
+public class PooledConnectionFactoryBean implements FactoryBean {
+
+    private static final Log LOGGER = LogFactory.getLog(PooledConnectionFactoryBean.class);
+
+    private PooledConnectionFactory pooledConnectionFactory;
+    private ConnectionFactory connectionFactory;
+    private int maxConnections = 1;
+    private int maximumActive = 500;
+    private Object transactionManager;
+    private String resourceName;
+    private ObjectPoolFactory poolFactory;
+
+    public int getMaxConnections() {
+        return maxConnections;
+    }
+
+    public void setMaxConnections(int maxConnections) {
+        this.maxConnections = maxConnections;
+    }
+
+    public int getMaximumActive() {
+        return maximumActive;
+    }
+
+    public void setMaximumActive(int maximumActive) {
+        this.maximumActive = maximumActive;
+    }
+
+    public Object getTransactionManager() {
+        return transactionManager;
+    }
+
+    public void setTransactionManager(Object transactionManager) {
+        this.transactionManager = transactionManager;
+    }
+
+    public String getResourceName() {
+        return resourceName;
+    }
+
+    public void setResourceName(String resourceName) {
+        this.resourceName = resourceName;
+    }
+
+    public ConnectionFactory getConnectionFactory() {
+        return connectionFactory;
+    }
+
+    public void setConnectionFactory(ConnectionFactory connectionFactory) {
+        this.connectionFactory = connectionFactory;
+    }
+
+    public ObjectPoolFactory getPoolFactory() {
+        return poolFactory;
+    }
+
+    public void setPoolFactory(ObjectPoolFactory poolFactory) {
+        this.poolFactory = poolFactory;
+    }
+
+    /**
+     *
+     * @throws Exception
+     * @org.apache.xbean.InitMethod
+     */
+    @PostConstruct
+    public void afterPropertiesSet() throws Exception {
+        if (pooledConnectionFactory == null && transactionManager != null && resourceName != null) {
+            try {
+                LOGGER.debug("Trying to build a JcaPooledConnectionFactory");
+                JcaPooledConnectionFactory f = new JcaPooledConnectionFactory();
+                f.setName(resourceName);
+                f.setTransactionManager((TransactionManager) transactionManager);
+                f.setMaxConnections(maxConnections);
+                f.setMaximumActive(maximumActive);
+                f.setConnectionFactory(connectionFactory);
+                f.setPoolFactory(poolFactory);
+                this.pooledConnectionFactory = f;
+            } catch (Throwable t) {
+                LOGGER.debug("Could not create JCA enabled connection factory: " + t, t);
+            }
+        }
+        if (pooledConnectionFactory == null && transactionManager != null) {
+            try {
+                LOGGER.debug("Trying to build a XaPooledConnectionFactory");
+                XaPooledConnectionFactory f = new XaPooledConnectionFactory();
+                f.setTransactionManager((TransactionManager) transactionManager);
+                f.setMaxConnections(maxConnections);
+                f.setMaximumActive(maximumActive);
+                f.setConnectionFactory(connectionFactory);
+                f.setPoolFactory(poolFactory);
+                this.pooledConnectionFactory = f;
+            } catch (Throwable t) {
+                LOGGER.debug("Could not create XA enabled connection factory: " + t, t);
+            }
+        }
+        if (pooledConnectionFactory == null) {
+            try {
+                LOGGER.debug("Trying to build a PooledConnectionFactory");
+                PooledConnectionFactory f = new PooledConnectionFactory();
+                f.setMaxConnections(maxConnections);
+                f.setMaximumActive(maximumActive);
+                f.setConnectionFactory(connectionFactory);
+                f.setPoolFactory(poolFactory);
+                this.pooledConnectionFactory = f;
+            } catch (Throwable t) {
+                LOGGER.debug("Could not create pooled connection factory: " + t, t);
+            }
+        }
+        if (pooledConnectionFactory == null) {
+            throw new IllegalStateException("Unable to create pooled connection factory.  Enable DEBUG log level for more informations");
+        }
+    }
+
+    /**
+     *
+     * @throws Exception
+     * @org.apache.xbean.DestroyMethod
+     */
+    @PreDestroy
+    public void destroy() throws Exception {
+        if (pooledConnectionFactory != null) {
+            pooledConnectionFactory.stop();
+            pooledConnectionFactory = null;
+        }
+    }
+
+    // FactoryBean methods
+    public Object getObject() throws Exception {
+        return pooledConnectionFactory;
+    }
+
+    public Class getObjectType() {
+        return ConnectionFactory.class;
+    }
+
+    public boolean isSingleton() {
+        return true;
+    }
+}

Added: activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/xbean/PooledBrokerFactoryBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/xbean/PooledBrokerFactoryBean.java?rev=935952&view=auto
==============================================================================
--- activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/xbean/PooledBrokerFactoryBean.java (added)
+++ activemq/trunk/activemq-spring/src/main/java/org/apache/activemq/xbean/PooledBrokerFactoryBean.java Tue Apr 20 15:05:29 2010
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.xbean;
+
+import java.util.HashMap;
+
+import org.apache.activemq.broker.BrokerService;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.FactoryBean;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.core.io.Resource;
+
+/**
+ * Used to share a single broker even if you have multiple broker bean
+ * definitions. A use case is where you have multiple web applications that want
+ * to start an embedded broker but only the first one to deploy should actually
+ * start it.
+ * 
+ * @version $Revision$
+ */
+public class PooledBrokerFactoryBean implements FactoryBean, InitializingBean, DisposableBean {
+
+    static final HashMap<String, SharedBroker> SHARED_BROKER_MAP = new HashMap<String, SharedBroker>();
+
+    private boolean start;
+    private Resource config;
+
+    static class SharedBroker {
+        BrokerFactoryBean factory;
+        int refCount;
+    }
+
+    public void afterPropertiesSet() throws Exception {
+        synchronized (SHARED_BROKER_MAP) {
+            SharedBroker sharedBroker = SHARED_BROKER_MAP.get(config.getFilename());
+            if (sharedBroker == null) {
+                sharedBroker = new SharedBroker();
+                sharedBroker.factory = new BrokerFactoryBean();
+                sharedBroker.factory.setConfig(config);
+                sharedBroker.factory.setStart(start);
+                sharedBroker.factory.afterPropertiesSet();
+                SHARED_BROKER_MAP.put(config.getFilename(), sharedBroker);
+            }
+            sharedBroker.refCount++;
+        }
+    }
+
+    public void destroy() throws Exception {
+        synchronized (SHARED_BROKER_MAP) {
+            SharedBroker sharedBroker = SHARED_BROKER_MAP.get(config.getFilename());
+            if (sharedBroker != null) {
+                sharedBroker.refCount--;
+                if (sharedBroker.refCount == 0) {
+                    sharedBroker.factory.destroy();
+                    SHARED_BROKER_MAP.remove(config.getFilename());
+                }
+            }
+        }
+    }
+
+    public Resource getConfig() {
+        return config;
+    }
+
+    public Object getObject() throws Exception {
+        synchronized (SHARED_BROKER_MAP) {
+            SharedBroker sharedBroker = SHARED_BROKER_MAP.get(config.getFilename());
+            if (sharedBroker != null) {
+                return sharedBroker.factory.getObject();
+            }
+        }
+        return null;
+    }
+
+    public Class getObjectType() {
+        return BrokerService.class;
+    }
+
+    public boolean isSingleton() {
+        return true;
+    }
+
+    public boolean isStart() {
+        return start;
+    }
+
+    public void setConfig(Resource config) {
+        this.config = config;
+    }
+
+    public void setStart(boolean start) {
+        this.start = start;
+    }
+
+}

Added: activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQDeadlockTest3.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQDeadlockTest3.java?rev=935952&view=auto
==============================================================================
--- activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQDeadlockTest3.java (added)
+++ activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQDeadlockTest3.java Tue Apr 20 15:05:29 2010
@@ -0,0 +1,433 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import junit.framework.Assert;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+
+public class AMQDeadlockTest3 extends org.apache.activemq.test.TestSupport {
+
+    private static final transient Log LOG = LogFactory.getLog(AMQDeadlockTest3.class);
+
+    private static final String URL1 = "tcp://localhost:61616";
+
+    private static final String URL2 = "tcp://localhost:61617";
+
+    private static final String QUEUE1_NAME = "test.queue.1";
+
+    private static final String QUEUE2_NAME = "test.queue.2";
+
+    private static final int MAX_CONSUMERS = 1;
+
+    private static final int MAX_PRODUCERS = 1;
+
+    private static final int NUM_MESSAGE_TO_SEND = 10;
+
+    private AtomicInteger messageCount = new AtomicInteger();
+    private CountDownLatch doneLatch;
+
+    public void setUp() throws Exception {
+    }
+
+    public void tearDown() throws Exception {
+    }
+
+    // This should fail with incubator-activemq-fuse-4.1.0.5
+    public void testQueueLimitsWithOneBrokerSameConnection() throws Exception {
+
+        BrokerService brokerService1 = null;
+        ActiveMQConnectionFactory acf = null;
+        PooledConnectionFactory pcf = null;
+        DefaultMessageListenerContainer container1 = null;
+
+        try {
+            brokerService1 = createBrokerService("broker1", URL1, null);
+            brokerService1.start();
+
+            acf = createConnectionFactory(URL1);
+            pcf = new PooledConnectionFactory(acf);
+
+            // Only listen on the first queue.. let the 2nd queue fill up.
+            doneLatch = new CountDownLatch(NUM_MESSAGE_TO_SEND);
+            container1 = createDefaultMessageListenerContainer(acf, new TestMessageListener1(500), QUEUE1_NAME);
+            container1.afterPropertiesSet();
+
+            Thread.sleep(2000);
+
+            final ExecutorService executor = Executors.newCachedThreadPool();
+            for (int i = 0; i < MAX_PRODUCERS; i++) {
+                executor.submit(new PooledProducerTask(pcf, QUEUE2_NAME));
+                Thread.sleep(1000);
+                executor.submit(new PooledProducerTask(pcf, QUEUE1_NAME));
+            }
+
+            // Wait for all message to arrive.
+            assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
+            executor.shutdownNow();
+
+            Assert.assertEquals(NUM_MESSAGE_TO_SEND, messageCount.get());
+
+        } finally {
+
+            container1.stop();
+            container1.destroy();
+            container1 = null;
+            brokerService1.stop();
+            brokerService1 = null;
+
+        }
+
+    }
+
+    // This should fail with incubator-activemq-fuse-4.1.0.5
+    public void testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithOneConnectionForProducing() throws Exception {
+
+        BrokerService brokerService1 = null;
+        BrokerService brokerService2 = null;
+        ActiveMQConnectionFactory acf1 = null;
+        ActiveMQConnectionFactory acf2 = null;
+        PooledConnectionFactory pcf = null;
+        DefaultMessageListenerContainer container1 = null;
+
+        try {
+            brokerService1 = createBrokerService("broker1", URL1, URL2);
+            brokerService1.start();
+            brokerService2 = createBrokerService("broker2", URL2, URL1);
+            brokerService2.start();
+
+            acf1 = createConnectionFactory(URL1);
+            acf2 = createConnectionFactory(URL2);
+
+            pcf = new PooledConnectionFactory(acf1);
+
+            Thread.sleep(1000);
+
+            doneLatch = new CountDownLatch(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND);
+            container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500), QUEUE1_NAME);
+            container1.afterPropertiesSet();
+
+            final ExecutorService executor = Executors.newCachedThreadPool();
+            for (int i = 0; i < MAX_PRODUCERS; i++) {
+                executor.submit(new PooledProducerTask(pcf, QUEUE2_NAME));
+                Thread.sleep(1000);
+                executor.submit(new PooledProducerTask(pcf, QUEUE1_NAME));
+            }
+
+            assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
+            executor.shutdownNow();
+
+            Assert.assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND, messageCount.get());
+        } finally {
+
+            container1.stop();
+            container1.destroy();
+            container1 = null;
+
+            brokerService1.stop();
+            brokerService1 = null;
+            brokerService2.stop();
+            brokerService2 = null;
+        }
+    }
+
+    // This should fail with incubator-activemq-fuse-4.1.0.5
+    public void testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithSeperateConnectionsForProducing() throws Exception {
+
+        BrokerService brokerService1 = null;
+        BrokerService brokerService2 = null;
+        ActiveMQConnectionFactory acf1 = null;
+        ActiveMQConnectionFactory acf2 = null;
+        DefaultMessageListenerContainer container1 = null;
+        DefaultMessageListenerContainer container2 = null;
+
+        try {
+            brokerService1 = createBrokerService("broker1", URL1, URL2);
+            brokerService1.start();
+            brokerService2 = createBrokerService("broker2", URL2, URL1);
+            brokerService2.start();
+
+            acf1 = createConnectionFactory(URL1);
+            acf2 = createConnectionFactory(URL2);
+
+            Thread.sleep(1000);
+
+            doneLatch = new CountDownLatch(NUM_MESSAGE_TO_SEND * MAX_PRODUCERS);
+
+            container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500), QUEUE1_NAME);
+            container1.afterPropertiesSet();
+            container2 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(30000), QUEUE2_NAME);
+            container2.afterPropertiesSet();
+
+            final ExecutorService executor = Executors.newCachedThreadPool();
+            for (int i = 0; i < MAX_PRODUCERS; i++) {
+                executor.submit(new NonPooledProducerTask(acf1, QUEUE2_NAME));
+                Thread.sleep(1000);
+                executor.submit(new NonPooledProducerTask(acf1, QUEUE1_NAME));
+            }
+
+            assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
+            executor.shutdownNow();
+
+            Assert.assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND, messageCount.get());
+        } finally {
+
+            container1.stop();
+            container1.destroy();
+            container1 = null;
+
+            container2.stop();
+            container2.destroy();
+            container2 = null;
+
+            brokerService1.stop();
+            brokerService1 = null;
+            brokerService2.stop();
+            brokerService2 = null;
+        }
+    }
+
+    private BrokerService createBrokerService(final String brokerName, final String uri1, final String uri2) throws Exception {
+        final BrokerService brokerService = new BrokerService();
+
+        brokerService.setBrokerName(brokerName);
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(true);
+
+        final SystemUsage memoryManager = new SystemUsage();
+        memoryManager.getMemoryUsage().setLimit(5000000);
+        brokerService.setSystemUsage(memoryManager);
+
+        final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
+
+        final PolicyEntry entry = new PolicyEntry();
+        entry.setQueue(">");
+        // entry.setQueue(QUEUE1_NAME);
+        entry.setMemoryLimit(1000);
+        policyEntries.add(entry);
+
+        final PolicyMap policyMap = new PolicyMap();
+        policyMap.setPolicyEntries(policyEntries);
+        brokerService.setDestinationPolicy(policyMap);
+
+        final TransportConnector tConnector = new TransportConnector();
+        tConnector.setUri(new URI(uri1));
+        tConnector.setName(brokerName + ".transportConnector");
+        brokerService.addConnector(tConnector);
+
+        if (uri2 != null) {
+            final NetworkConnector nc = new DiscoveryNetworkConnector(new URI("static:" + uri2));
+            nc.setBridgeTempDestinations(true);
+            nc.setBrokerName(brokerName);
+            brokerService.addNetworkConnector(nc);
+        }
+
+        return brokerService;
+
+    }
+
+    public DefaultMessageListenerContainer createDefaultMessageListenerContainer(final ConnectionFactory acf, final MessageListener listener, final String queue) {
+        final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
+        container.setConnectionFactory(acf);
+        container.setDestinationName(queue);
+        container.setMessageListener(listener);
+        container.setSessionTransacted(false);
+        container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
+        container.setConcurrentConsumers(MAX_CONSUMERS);
+        return container;
+    }
+
+    public ActiveMQConnectionFactory createConnectionFactory(final String url) {
+        final ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(url);
+        acf.setCopyMessageOnSend(false);
+        acf.setUseAsyncSend(false);
+        acf.setDispatchAsync(true);
+        acf.setUseCompression(false);
+        acf.setOptimizeAcknowledge(false);
+        acf.setOptimizedMessageDispatch(true);
+        acf.setAlwaysSyncSend(true);
+        return acf;
+    }
+
+    private class TestMessageListener1 implements MessageListener {
+
+        private final long waitTime;
+
+        public TestMessageListener1(long waitTime) {
+            this.waitTime = waitTime;
+
+        }
+
+        public void onMessage(Message msg) {
+
+            try {
+                LOG.info("Listener1 Consumed message " + msg.getIntProperty("count"));
+
+                messageCount.incrementAndGet();
+                doneLatch.countDown();
+
+                Thread.sleep(waitTime);
+            } catch (JMSException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            } catch (InterruptedException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+
+        }
+    }
+
+    private static class PooledProducerTask implements Runnable {
+
+        private final String queueName;
+
+        private final PooledConnectionFactory pcf;
+
+        public PooledProducerTask(final PooledConnectionFactory pcf, final String queueName) {
+            this.pcf = pcf;
+            this.queueName = queueName;
+        }
+
+        public void run() {
+
+            try {
+
+                final JmsTemplate jmsTemplate = new JmsTemplate(pcf);
+                jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                jmsTemplate.setExplicitQosEnabled(true);
+                jmsTemplate.setMessageIdEnabled(false);
+                jmsTemplate.setMessageTimestampEnabled(false);
+                jmsTemplate.afterPropertiesSet();
+
+                final byte[] bytes = new byte[2048];
+                final Random r = new Random();
+                r.nextBytes(bytes);
+
+                Thread.sleep(2000);
+
+                final AtomicInteger count = new AtomicInteger();
+                for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
+                    jmsTemplate.send(queueName, new MessageCreator() {
+
+                        public Message createMessage(Session session) throws JMSException {
+
+                            final BytesMessage message = session.createBytesMessage();
+
+                            message.writeBytes(bytes);
+                            message.setIntProperty("count", count.incrementAndGet());
+                            message.setStringProperty("producer", "pooled");
+                            return message;
+                        }
+                    });
+
+                    LOG.info("PooledProducer sent message: " + count.get());
+                    // Thread.sleep(1000);
+                }
+
+            } catch (final Throwable e) {
+                LOG.error("Producer 1 is exiting", e);
+            }
+        }
+    }
+
+    private static class NonPooledProducerTask implements Runnable {
+
+        private final String queueName;
+
+        private final ConnectionFactory cf;
+
+        public NonPooledProducerTask(final ConnectionFactory cf, final String queueName) {
+            this.cf = cf;
+            this.queueName = queueName;
+        }
+
+        public void run() {
+
+            try {
+
+                final JmsTemplate jmsTemplate = new JmsTemplate(cf);
+                jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                jmsTemplate.setExplicitQosEnabled(true);
+                jmsTemplate.setMessageIdEnabled(false);
+                jmsTemplate.setMessageTimestampEnabled(false);
+                jmsTemplate.afterPropertiesSet();
+
+                final byte[] bytes = new byte[2048];
+                final Random r = new Random();
+                r.nextBytes(bytes);
+
+                Thread.sleep(2000);
+
+                final AtomicInteger count = new AtomicInteger();
+                for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
+                    jmsTemplate.send(queueName, new MessageCreator() {
+
+                        public Message createMessage(Session session) throws JMSException {
+
+                            final BytesMessage message = session.createBytesMessage();
+
+                            message.writeBytes(bytes);
+                            message.setIntProperty("count", count.incrementAndGet());
+                            message.setStringProperty("producer", "non-pooled");
+                            return message;
+                        }
+                    });
+
+                    LOG.info("Non-PooledProducer sent message: " + count.get());
+
+                    // Thread.sleep(1000);
+                }
+
+            } catch (final Throwable e) {
+                LOG.error("Producer 1 is exiting", e);
+            }
+        }
+    }
+
+}

Added: activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQDeadlockTestW4Brokers.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQDeadlockTestW4Brokers.java?rev=935952&view=auto
==============================================================================
--- activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQDeadlockTestW4Brokers.java (added)
+++ activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQDeadlockTestW4Brokers.java Tue Apr 20 15:05:29 2010
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.test.*;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+
+public class AMQDeadlockTestW4Brokers extends org.apache.activemq.test.TestSupport {
+
+    private static final transient Log LOG = LogFactory.getLog(AMQDeadlockTestW4Brokers.class);
+
+    private static final String BROKER_URL1 = "tcp://localhost:61616";
+    private static final String BROKER_URL2 = "tcp://localhost:61617";
+    private static final String BROKER_URL3 = "tcp://localhost:61618";
+    private static final String BROKER_URL4 = "tcp://localhost:61619";
+    private static final String URL1 = "tcp://localhost:61616?wireFormat.cacheEnabled=false"
+                                       + "&wireFormat.tightEncodingEnabled=false&wireFormat.maxInactivityDuration=30000&wireFormat.tcpNoDelayEnabled=false";
+    private static final String URL2 = "tcp://localhost:61617?wireFormat.cacheEnabled=false"
+                                       + "&wireFormat.tightEncodingEnabled=false&wireFormat.maxInactivityDuration=30000&wireFormat.tcpNoDelayEnabled=false";
+    private static final String URL3 = "tcp://localhost:61618?wireFormat.cacheEnabled=false"
+                                       + "&wireFormat.tightEncodingEnabled=false&wireFormat.maxInactivityDuration=30000&wireFormat.tcpNoDelayEnabled=false";
+    private static final String URL4 = "tcp://localhost:61619?wireFormat.cacheEnabled=false"
+                                       + "&wireFormat.tightEncodingEnabled=false&wireFormat.maxInactivityDuration=30000&wireFormat.tcpNoDelayEnabled=false";
+    private static final String QUEUE1_NAME = "test.queue.1";
+    private static final int MAX_CONSUMERS = 5;
+    private static final int NUM_MESSAGE_TO_SEND = 10000;
+    private static final CountDownLatch LATCH = new CountDownLatch(MAX_CONSUMERS * NUM_MESSAGE_TO_SEND);
+
+    @Override
+    public void setUp() throws Exception {
+
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+
+    }
+
+    public void test4BrokerWithOutLingo() throws Exception {
+
+        BrokerService brokerService1 = null;
+        BrokerService brokerService2 = null;
+        BrokerService brokerService3 = null;
+        BrokerService brokerService4 = null;
+        ActiveMQConnectionFactory acf1 = null;
+        ActiveMQConnectionFactory acf2 = null;
+        PooledConnectionFactory pcf1 = null;
+        PooledConnectionFactory pcf2 = null;
+        ActiveMQConnectionFactory acf3 = null;
+        ActiveMQConnectionFactory acf4 = null;
+        PooledConnectionFactory pcf3 = null;
+        PooledConnectionFactory pcf4 = null;
+        DefaultMessageListenerContainer container1 = null;
+
+        try {
+
+            // Test with and without queue limits.
+            brokerService1 = createBrokerService("broker1", BROKER_URL1, BROKER_URL2, BROKER_URL3, BROKER_URL4, 0 /* 10000000 */);
+            brokerService1.start();
+            brokerService2 = createBrokerService("broker2", BROKER_URL2, BROKER_URL1, BROKER_URL3, BROKER_URL4, 0/* 40000000 */);
+            brokerService2.start();
+            brokerService3 = createBrokerService("broker3", BROKER_URL3, BROKER_URL2, BROKER_URL1, BROKER_URL4, 0/* 10000000 */);
+            brokerService3.start();
+            brokerService4 = createBrokerService("broker4", BROKER_URL4, BROKER_URL1, BROKER_URL3, BROKER_URL2, 0/* 10000000 */);
+            brokerService4.start();
+
+            final String failover1 = "failover:(" + URL1
+                                     + ")?initialReconnectDelay=10&maxReconnectDelay=30000&useExponentialBackOff=true&backOffMultiplier=2&maxReconnectAttempts=0&randomize=false";
+            final String failover2 = "failover:(" + URL2
+                                     + ")?initialReconnectDelay=10&maxReconnectDelay=30000&useExponentialBackOff=true&backOffMultiplier=2&maxReconnectAttempts=0&randomize=false";
+
+            final String failover3 = "failover:(" + URL3
+                                     + ")?initialReconnectDelay=10&maxReconnectDelay=30000&useExponentialBackOff=true&backOffMultiplier=2&maxReconnectAttempts=0&randomize=false";
+
+            final String failover4 = "failover:(" + URL4
+                                     + ")?initialReconnectDelay=10&maxReconnectDelay=30000&useExponentialBackOff=true&backOffMultiplier=2&maxReconnectAttempts=0&randomize=false";
+
+            acf1 = createConnectionFactory(failover1);
+            acf2 = createConnectionFactory(failover2);
+            acf3 = createConnectionFactory(failover3);
+            acf4 = createConnectionFactory(failover4);
+
+            pcf1 = new PooledConnectionFactory(acf1);
+            pcf2 = new PooledConnectionFactory(acf2);
+            pcf3 = new PooledConnectionFactory(acf3);
+            pcf4 = new PooledConnectionFactory(acf4);
+
+            container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(0), QUEUE1_NAME);
+            container1.afterPropertiesSet();
+
+            final PooledProducerTask[] task = new PooledProducerTask[4];
+            task[0] = new PooledProducerTask(pcf1, QUEUE1_NAME, "producer1");
+            task[1] = new PooledProducerTask(pcf2, QUEUE1_NAME, "producer2");
+            task[2] = new PooledProducerTask(pcf3, QUEUE1_NAME, "producer3");
+            task[3] = new PooledProducerTask(pcf4, QUEUE1_NAME, "producer4");
+
+            final ExecutorService executor = Executors.newCachedThreadPool();
+
+            for (int i = 0; i < 4; i++) {
+                executor.submit(task[i]);
+            }
+
+            LATCH.await(15, TimeUnit.SECONDS);
+            assertTrue(LATCH.getCount() == MAX_CONSUMERS * NUM_MESSAGE_TO_SEND);
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+
+            container1.stop();
+            container1.destroy();
+            container1 = null;
+
+            brokerService1.stop();
+            brokerService1 = null;
+            brokerService2.stop();
+            brokerService2 = null;
+            brokerService3.stop();
+            brokerService3 = null;
+            brokerService4.stop();
+            brokerService4 = null;
+        }
+    }
+
+    private BrokerService createBrokerService(final String brokerName, final String uri1, final String uri2, final String uri3, final String uri4, final int queueLimit)
+        throws Exception {
+        final BrokerService brokerService = new BrokerService();
+
+        brokerService.setBrokerName(brokerName);
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(true);
+
+        final SystemUsage memoryManager = new SystemUsage();
+        memoryManager.getMemoryUsage().setLimit(100000000);
+        brokerService.setSystemUsage(memoryManager);
+
+        final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
+
+        final PolicyEntry entry = new PolicyEntry();
+        entry.setQueue(">");
+        entry.setMemoryLimit(queueLimit);
+        policyEntries.add(entry);
+
+        final PolicyMap policyMap = new PolicyMap();
+        policyMap.setPolicyEntries(policyEntries);
+        brokerService.setDestinationPolicy(policyMap);
+
+        final TransportConnector tConnector = new TransportConnector();
+        tConnector.setUri(new URI(uri1));
+        tConnector.setName(brokerName + ".transportConnector");
+        brokerService.addConnector(tConnector);
+
+        if (uri2 != null) {
+            final NetworkConnector nc = new DiscoveryNetworkConnector(new URI("static:" + uri2 + "," + uri3 + "," + uri4));
+            nc.setBridgeTempDestinations(true);
+            nc.setBrokerName(brokerName);
+
+            // When using queue limits set this to 1
+            nc.setPrefetchSize(1000);
+            nc.setNetworkTTL(1);
+            brokerService.addNetworkConnector(nc);
+        }
+
+        return brokerService;
+    }
+
+    public DefaultMessageListenerContainer createDefaultMessageListenerContainer(final ConnectionFactory acf, final MessageListener listener, final String queue) {
+        final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
+        container.setConnectionFactory(acf);
+        container.setDestinationName(queue);
+        container.setMessageListener(listener);
+        container.setSessionTransacted(false);
+        container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
+        container.setConcurrentConsumers(MAX_CONSUMERS);
+        return container;
+    }
+
+    public ActiveMQConnectionFactory createConnectionFactory(final String url) {
+        final ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(url);
+        acf.setCopyMessageOnSend(false);
+        acf.setUseAsyncSend(false);
+        acf.setDispatchAsync(true);
+        acf.setUseCompression(false);
+        acf.setOptimizeAcknowledge(false);
+        acf.setOptimizedMessageDispatch(true);
+        acf.setUseAsyncSend(false);
+
+        return acf;
+    }
+
+    private static class TestMessageListener1 implements MessageListener {
+        final AtomicInteger count = new AtomicInteger(0);
+        private final long waitTime;
+
+        public TestMessageListener1(long waitTime) {
+            this.waitTime = waitTime;
+        }
+
+        public void onMessage(Message msg) {
+
+            try {
+                /*
+                 * log.info("Listener1 Consumed message " +
+                 * msg.getIntProperty("count") + " from " +
+                 * msg.getStringProperty("producerName"));
+                 */
+                int value = count.incrementAndGet();
+                if (value % 1000 == 0) {
+                    LOG.info("Consumed message: " + value);
+                }
+
+                Thread.sleep(waitTime);
+                LATCH.countDown();
+                /*
+                 * } catch (JMSException e) { e.printStackTrace();
+                 */
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private class PooledProducerTask implements Runnable {
+        private final String queueName;
+        private final PooledConnectionFactory pcf;
+        private final String producerName;
+
+        public PooledProducerTask(final PooledConnectionFactory pcf, final String queueName, final String producerName) {
+            this.pcf = pcf;
+            this.queueName = queueName;
+            this.producerName = producerName;
+        }
+
+        public void run() {
+
+            try {
+
+                final JmsTemplate jmsTemplate = new JmsTemplate(pcf);
+                jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                jmsTemplate.setExplicitQosEnabled(true);
+                jmsTemplate.setMessageIdEnabled(false);
+                jmsTemplate.setMessageTimestampEnabled(false);
+                jmsTemplate.afterPropertiesSet();
+
+                final byte[] bytes = new byte[2048];
+                final Random r = new Random();
+                r.nextBytes(bytes);
+
+                for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
+                    final int count = i;
+                    jmsTemplate.send(queueName, new MessageCreator() {
+                        public Message createMessage(Session session) throws JMSException {
+
+                            final BytesMessage message = session.createBytesMessage();
+
+                            message.writeBytes(bytes);
+                            message.setIntProperty("count", count);
+                            message.setStringProperty("producerName", producerName);
+                            return message;
+                        }
+                    });
+
+                    // log.info("PooledProducer " + producerName + " sent
+                    // message: " + count);
+
+                    // Thread.sleep(1000);
+                }
+            } catch (final Throwable e) {
+                LOG.error("Producer 1 is exiting", e);
+            }
+        }
+    }
+}

Added: activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java?rev=935952&view=auto
==============================================================================
--- activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java (added)
+++ activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java Tue Apr 20 15:05:29 2010
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import junit.framework.Assert;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.test.*;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.apache.activemq.usage.SystemUsage;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+
+public class AMQFailoverIssue extends org.apache.activemq.test.TestSupport {
+
+    private static final String URL1 = "tcp://localhost:61616";
+    private static final String QUEUE1_NAME = "test.queue.1";
+    private static final int MAX_CONSUMERS = 10;
+    private static final int MAX_PRODUCERS = 5;
+    private static final int NUM_MESSAGE_TO_SEND = 10000;
+    private static final int TOTAL_MESSAGES = MAX_PRODUCERS * NUM_MESSAGE_TO_SEND;
+    private static final boolean USE_FAILOVER = true;
+    private AtomicInteger messageCount = new AtomicInteger();
+    private CountDownLatch doneLatch;
+
+    @Override
+    public void setUp() throws Exception {
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+    }
+
+    // This should fail with incubator-activemq-fuse-4.1.0.5
+    public void testFailoverIssue() throws Exception {
+        BrokerService brokerService1 = null;
+        ActiveMQConnectionFactory acf;
+        PooledConnectionFactory pcf;
+        DefaultMessageListenerContainer container1 = null;
+        try {
+            brokerService1 = createBrokerService("broker1", URL1, null);
+            brokerService1.start();
+            acf = createConnectionFactory(URL1, USE_FAILOVER);
+            pcf = new PooledConnectionFactory(acf);
+            // Only listen on the first queue.. let the 2nd queue fill up.
+            doneLatch = new CountDownLatch(TOTAL_MESSAGES);
+            container1 = createDefaultMessageListenerContainer(acf, new TestMessageListener1(0), QUEUE1_NAME);
+            container1.afterPropertiesSet();
+            Thread.sleep(5000);
+            final ExecutorService executor = Executors.newCachedThreadPool();
+            for (int i = 0; i < MAX_PRODUCERS; i++) {
+                executor.submit(new PooledProducerTask(pcf, QUEUE1_NAME));
+            }
+            // Wait for all message to arrive.
+            assertTrue(doneLatch.await(45, TimeUnit.SECONDS));
+            executor.shutdown();
+            // Thread.sleep(30000);
+            Assert.assertEquals(TOTAL_MESSAGES, messageCount.get());
+        } finally {
+            container1.stop();
+            container1.destroy();
+            container1 = null;
+            brokerService1.stop();
+            brokerService1 = null;
+        }
+    }
+
+    private BrokerService createBrokerService(final String brokerName, final String uri1, final String uri2) throws Exception {
+        final BrokerService brokerService = new BrokerService();
+        brokerService.setBrokerName(brokerName);
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(true);
+        final SystemUsage memoryManager = new SystemUsage();
+        memoryManager.getMemoryUsage().setLimit(5000000);
+        brokerService.setSystemUsage(memoryManager);
+        final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
+        final PolicyEntry entry = new PolicyEntry();
+        entry.setQueue(">");
+        // entry.setQueue(QUEUE1_NAME);
+        entry.setMemoryLimit(1);
+        policyEntries.add(entry);
+        final PolicyMap policyMap = new PolicyMap();
+        policyMap.setPolicyEntries(policyEntries);
+        brokerService.setDestinationPolicy(policyMap);
+        final TransportConnector tConnector = new TransportConnector();
+        tConnector.setUri(new URI(uri1));
+        tConnector.setName(brokerName + ".transportConnector");
+        brokerService.addConnector(tConnector);
+        if (uri2 != null) {
+            final NetworkConnector nc = new DiscoveryNetworkConnector(new URI("static:" + uri2));
+            nc.setBridgeTempDestinations(true);
+            nc.setBrokerName(brokerName);
+            nc.setPrefetchSize(1);
+            brokerService.addNetworkConnector(nc);
+        }
+        return brokerService;
+    }
+
+    public DefaultMessageListenerContainer createDefaultMessageListenerContainer(final ConnectionFactory acf, final MessageListener listener, final String queue) {
+        final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
+        container.setConnectionFactory(acf);
+        container.setDestinationName(queue);
+        container.setMessageListener(listener);
+        container.setSessionTransacted(false);
+        container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
+        container.setConcurrentConsumers(MAX_CONSUMERS);
+        return container;
+    }
+
+    public ActiveMQConnectionFactory createConnectionFactory(final String url, final boolean useFailover) {
+        final String failoverUrl = "failover:(" + url + ")";
+        final ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(useFailover ? failoverUrl : url);
+        acf.setCopyMessageOnSend(false);
+        acf.setUseAsyncSend(false);
+        acf.setDispatchAsync(true);
+        acf.setUseCompression(false);
+        acf.setOptimizeAcknowledge(false);
+        acf.setOptimizedMessageDispatch(true);
+        acf.setUseAsyncSend(false);
+        return acf;
+    }
+
+    private class TestMessageListener1 implements MessageListener {
+
+        private final long waitTime;
+
+        public TestMessageListener1(long waitTime) {
+            this.waitTime = waitTime;
+        }
+
+        public void onMessage(Message msg) {
+            try {
+                messageCount.incrementAndGet();
+                doneLatch.countDown();
+                Thread.sleep(waitTime);
+            } catch (InterruptedException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private static class PooledProducerTask implements Runnable {
+
+        private final String queueName;
+        private final PooledConnectionFactory pcf;
+
+        public PooledProducerTask(final PooledConnectionFactory pcf, final String queueName) {
+            this.pcf = pcf;
+            this.queueName = queueName;
+        }
+
+        public void run() {
+            try {
+                final JmsTemplate jmsTemplate = new JmsTemplate(pcf);
+                jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                jmsTemplate.setExplicitQosEnabled(true);
+                jmsTemplate.setMessageIdEnabled(false);
+                jmsTemplate.setMessageTimestampEnabled(false);
+                jmsTemplate.afterPropertiesSet();
+                final byte[] bytes = new byte[2048];
+                final Random r = new Random();
+                r.nextBytes(bytes);
+                Thread.sleep(2000);
+                final AtomicInteger count = new AtomicInteger();
+                for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
+                    jmsTemplate.send(queueName, new MessageCreator() {
+
+                        public Message createMessage(Session session) throws JMSException {
+                            final BytesMessage message = session.createBytesMessage();
+                            message.writeBytes(bytes);
+                            message.setIntProperty("count", count.incrementAndGet());
+                            message.setStringProperty("producer", "pooled");
+                            return message;
+                        }
+                    });
+                }
+            } catch (final Throwable e) {
+                e.printStackTrace();
+            }
+        }
+    }
+}

Modified: activemq/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/pom.xml?rev=935952&r1=935951&r2=935952&view=diff
==============================================================================
--- activemq/trunk/pom.xml (original)
+++ activemq/trunk/pom.xml Tue Apr 20 15:05:29 2010
@@ -142,6 +142,7 @@
     <module>activemq-ra</module>
     <module>activemq-rar</module>
     <module>activemq-run</module>
+    <module>activemq-spring</module>
     <module>activemq-tooling</module>
     <module>activemq-web</module>
     <module>activemq-web-demo</module>