You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/04/13 22:54:36 UTC

svn commit: r1325943 - in /activemq/trunk/activemq-pool/src: main/java/org/apache/activemq/pool/ test/java/org/apache/activemq/pool/

Author: tabish
Date: Fri Apr 13 20:54:35 2012
New Revision: 1325943

URL: http://svn.apache.org/viewvc?rev=1325943&view=rev
Log:
Apply Fix For: https://issues.apache.org/jira/browse/AMQ-3680

Added:
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSessionEventListener.java   (with props)
    activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionTempDestCleanupTest.java   (with props)
Modified:
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java?rev=1325943&r1=1325942&r2=1325943&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java (original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java Fri Apr 13 20:54:35 2012
@@ -53,8 +53,7 @@ public class ConnectionPool {
     public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory) {
         this(connection, new ConcurrentHashMap<SessionKey, SessionPool>(), poolFactory);
         // Add a transport Listener so that we can notice if this connection
-        // should be expired due to
-        // a connection failure.
+        // should be expired due to a connection failure.
         connection.addTransportListener(new TransportListener() {
             public void onCommand(Object command) {
             }
@@ -71,10 +70,9 @@ public class ConnectionPool {
             public void transportResumed() {
             }
         });
-        //
+
         // make sure that we set the hasFailed flag, in case the transport already failed
         // prior to the addition of our new TransportListener
-        //
         if(connection.isTransportFailed()) {
             hasFailed = true;
         }

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java?rev=1325943&r1=1325942&r2=1325943&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java (original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java Fri Apr 13 20:54:35 2012
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.pool;
 
+import java.util.concurrent.CopyOnWriteArrayList;
+
 import javax.jms.Connection;
 import javax.jms.ConnectionConsumer;
 import javax.jms.ConnectionMetaData;
@@ -27,6 +29,8 @@ import javax.jms.QueueConnection;
 import javax.jms.QueueSession;
 import javax.jms.ServerSessionPool;
 import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
 import javax.jms.Topic;
 import javax.jms.TopicConnection;
 import javax.jms.TopicSession;
@@ -36,6 +40,8 @@ import org.apache.activemq.ActiveMQSessi
 import org.apache.activemq.AlreadyClosedException;
 import org.apache.activemq.EnhancedConnection;
 import org.apache.activemq.advisory.DestinationSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Represents a proxy {@link Connection} which is-a {@link TopicConnection} and
@@ -47,12 +53,14 @@ import org.apache.activemq.advisory.Dest
  * library like <a href="http://jencks.org/">Jencks</a> such as in <a
  * href="http://jencks.org/Message+Driven+POJOs">this example</a>
  *
- *
  */
 public class PooledConnection implements TopicConnection, QueueConnection, EnhancedConnection {
+    private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnection.class);
 
     private ConnectionPool pool;
     private boolean stopped;
+    private final CopyOnWriteArrayList<TemporaryQueue> connTempQueues = new CopyOnWriteArrayList<TemporaryQueue>();
+    private final CopyOnWriteArrayList<TemporaryTopic> connTempTopics = new CopyOnWriteArrayList<TemporaryTopic>();
 
     public PooledConnection(ConnectionPool pool) {
         this.pool = pool;
@@ -67,6 +75,7 @@ public class PooledConnection implements
     }
 
     public void close() throws JMSException {
+        this.cleanupConnectionTemporaryDestinations();
         if (this.pool != null) {
             this.pool.decrementReferenceCount();
             this.pool = null;
@@ -82,22 +91,17 @@ public class PooledConnection implements
         stopped = true;
     }
 
-    public ConnectionConsumer createConnectionConsumer(Destination destination, String selector,
-                                                       ServerSessionPool serverSessionPool, int maxMessages)
-        throws JMSException {
-        return getConnection()
-            .createConnectionConsumer(destination, selector, serverSessionPool, maxMessages);
+    public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool serverSessionPool, int maxMessages)
+            throws JMSException {
+        return getConnection().createConnectionConsumer(destination, selector, serverSessionPool, maxMessages);
     }
 
-    public ConnectionConsumer createConnectionConsumer(Topic topic, String s,
-                                                       ServerSessionPool serverSessionPool, int maxMessages)
-        throws JMSException {
+    public ConnectionConsumer createConnectionConsumer(Topic topic, String s, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
         return getConnection().createConnectionConsumer(topic, s, serverSessionPool, maxMessages);
     }
 
-    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1,
-                                                              ServerSessionPool serverSessionPool, int i)
-        throws JMSException {
+    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1, ServerSessionPool serverSessionPool, int i)
+            throws JMSException {
         return getConnection().createDurableConnectionConsumer(topic, selector, s1, serverSessionPool, i);
     }
 
@@ -118,34 +122,49 @@ public class PooledConnection implements
     }
 
     public void setClientID(String clientID) throws JMSException {
-    	
-    	// ignore repeated calls to setClientID() with the same client id
-    	// this could happen when a JMS component such as Spring that uses a 
-    	// PooledConnectionFactory shuts down and reinitializes.
-    	//
+
+        // ignore repeated calls to setClientID() with the same client id
+        // this could happen when a JMS component such as Spring that uses a
+        // PooledConnectionFactory shuts down and reinitializes.
         if (this.getConnection().getClientID() == null || !this.getClientID().equals(clientID)) {
-        	getConnection().setClientID(clientID);
+            getConnection().setClientID(clientID);
         }
     }
 
-    public ConnectionConsumer createConnectionConsumer(Queue queue, String selector,
-                                                       ServerSessionPool serverSessionPool, int maxMessages)
-        throws JMSException {
+    public ConnectionConsumer createConnectionConsumer(Queue queue, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
         return getConnection().createConnectionConsumer(queue, selector, serverSessionPool, maxMessages);
     }
 
     // Session factory methods
     // -------------------------------------------------------------------------
     public QueueSession createQueueSession(boolean transacted, int ackMode) throws JMSException {
-        return (QueueSession)createSession(transacted, ackMode);
+        return (QueueSession) createSession(transacted, ackMode);
     }
 
     public TopicSession createTopicSession(boolean transacted, int ackMode) throws JMSException {
-        return (TopicSession)createSession(transacted, ackMode);
+        return (TopicSession) createSession(transacted, ackMode);
     }
 
     public Session createSession(boolean transacted, int ackMode) throws JMSException {
-        return pool.createSession(transacted, ackMode);
+        PooledSession result;
+        result = (PooledSession) pool.createSession(transacted, ackMode);
+
+        // Add a temporary destination event listener to the session that notifies us when
+        // the session creates temporary destinations.
+        result.addTempDestEventListener(new PooledSessionEventListener() {
+
+            @Override
+            public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
+                connTempQueues.add(tempQueue);
+            }
+
+            @Override
+            public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
+                connTempTopics.add(tempTopic);
+            }
+        });
+
+        return (Session) result;
     }
 
     // EnhancedCollection API
@@ -170,10 +189,39 @@ public class PooledConnection implements
     }
 
     protected ActiveMQSession createSession(SessionKey key) throws JMSException {
-        return (ActiveMQSession)getConnection().createSession(key.isTransacted(), key.getAckMode());
+        return (ActiveMQSession) getConnection().createSession(key.isTransacted(), key.getAckMode());
     }
 
     public String toString() {
         return "PooledConnection { " + pool + " }";
     }
+
+    /**
+     * Remove all of the temporary destinations created for this connection.
+     * This is important since the underlying connection may be reused over a
+     * long period of time, accumulating all of the temporary destinations from
+     * each use. However, from the perspective of the lifecycle from the
+     * client's view, close() closes the connection and, therefore, deletes all
+     * of the temporary destinations created.
+     */
+    protected void cleanupConnectionTemporaryDestinations() {
+
+        for (TemporaryQueue tempQueue : connTempQueues) {
+            try {
+                tempQueue.delete();
+            } catch (JMSException ex) {
+                LOG.info("failed to delete Temporary Queue \"" + tempQueue.toString() + "\" on closing pooled connection: " + ex.getMessage());
+            }
+        }
+        connTempQueues.clear();
+
+        for (TemporaryTopic tempTopic : connTempTopics) {
+            try {
+                tempTopic.delete();
+            } catch (JMSException ex) {
+                LOG.info("failed to delete Temporary Topic \"" + tempTopic.toString() + "\" on closing pooled connection: " + ex.getMessage());
+            }
+        }
+        connTempTopics.clear();
+    }
 }

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java?rev=1325943&r1=1325942&r2=1325943&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java (original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java Fri Apr 13 20:54:35 2012
@@ -34,6 +34,7 @@ import javax.jms.QueueBrowser;
 import javax.jms.QueueReceiver;
 import javax.jms.QueueSender;
 import javax.jms.QueueSession;
+import javax.jms.Session;
 import javax.jms.StreamMessage;
 import javax.jms.TemporaryQueue;
 import javax.jms.TemporaryTopic;
@@ -43,7 +44,6 @@ import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
 import javax.jms.XASession;
-import javax.jms.Session;
 import javax.transaction.xa.XAResource;
 
 import org.apache.activemq.ActiveMQMessageProducer;
@@ -54,9 +54,6 @@ import org.apache.activemq.AlreadyClosed
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- *
- */
 public class PooledSession implements Session, TopicSession, QueueSession, XASession {
     private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);
 
@@ -70,6 +67,8 @@ public class PooledSession implements Se
 
     private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
     private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
+    private final CopyOnWriteArrayList<PooledSessionEventListener> tempDestEventListeners =
+        new CopyOnWriteArrayList<PooledSessionEventListener>();
     private boolean isXa;
 
     public PooledSession(ActiveMQSession aSession, SessionPool sessionPool) {
@@ -78,6 +77,10 @@ public class PooledSession implements Se
         this.transactional = session.isTransacted();
     }
 
+    public void addTempDestEventListener(PooledSessionEventListener listener) {
+        this.tempDestEventListeners.add(listener);
+    }
+
     protected boolean isIgnoreClose() {
         return ignoreClose;
     }
@@ -121,6 +124,7 @@ public class PooledSession implements Se
                 consumers.clear();
                 browsers.clear();
             }
+
             if (invalidate) {
                 // lets close the session and not put the session back into
                 // the pool
@@ -172,11 +176,29 @@ public class PooledSession implements Se
     }
 
     public TemporaryQueue createTemporaryQueue() throws JMSException {
-        return getInternalSession().createTemporaryQueue();
+        TemporaryQueue result;
+
+        result = getInternalSession().createTemporaryQueue();
+
+        // Notify all of the listeners of the created temporary Queue.
+        for (PooledSessionEventListener listener : this.tempDestEventListeners) {
+            listener.onTemporaryQueueCreate(result);
+        }
+
+        return result;
     }
 
     public TemporaryTopic createTemporaryTopic() throws JMSException {
-        return getInternalSession().createTemporaryTopic();
+        TemporaryTopic result;
+
+        result = getInternalSession().createTemporaryTopic();
+
+        // Notify all of the listeners of the created temporary Topic.
+        for (PooledSessionEventListener listener : this.tempDestEventListeners) {
+            listener.onTemporaryTopicCreate(result);
+        }
+
+        return result;
     }
 
     public void unsubscribe(String s) throws JMSException {
@@ -299,10 +321,12 @@ public class PooledSession implements Se
     /**
      * Callback invoked when the consumer is closed.
      * <p/>
-     * This is used to keep track of an explicit closed consumer created by this session,
-     * by which we know do not need to keep track of the consumer, as its already closed.
+     * This is used to keep track of an explicit closed consumer created by this
+     * session, by which we know do not need to keep track of the consumer, as
+     * its already closed.
      *
-     * @param consumer the consumer which is being closed
+     * @param consumer
+     *            the consumer which is being closed
      */
     protected void onConsumerClose(MessageConsumer consumer) {
         consumers.remove(consumer);
@@ -343,8 +367,10 @@ public class PooledSession implements Se
 
     private MessageConsumer addConsumer(MessageConsumer consumer) {
         consumers.add(consumer);
-        // must wrap in PooledMessageConsumer to ensure the onConsumerClose method is invoked
-        // when the returned consumer is closed, to avoid memory leak in this session class
+        // must wrap in PooledMessageConsumer to ensure the onConsumerClose
+        // method is invoked
+        // when the returned consumer is closed, to avoid memory leak in this
+        // session class
         // in case many consumers is created
         return new PooledMessageConsumer(this, consumer);
     }

Added: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSessionEventListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSessionEventListener.java?rev=1325943&view=auto
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSessionEventListener.java (added)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSessionEventListener.java Fri Apr 13 20:54:35 2012
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.pool;
+
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+
+interface PooledSessionEventListener {
+
+    /**
+     * Called on successful creation of a new TemporaryQueue.
+     *
+     * @param tempQueue
+     *            The TemporaryQueue just created.
+     */
+    void onTemporaryQueueCreate(TemporaryQueue tempQueue);
+
+    /**
+     * Called on successful creation of a new TemporaryTopic.
+     *
+     * @param tempTopic
+     *            The TemporaryTopic just created.
+     */
+    void onTemporaryTopicCreate(TemporaryTopic tempTopic);
+
+}

Propchange: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSessionEventListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionTempDestCleanupTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionTempDestCleanupTest.java?rev=1325943&view=auto
==============================================================================
--- activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionTempDestCleanupTest.java (added)
+++ activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionTempDestCleanupTest.java Fri Apr 13 20:54:35 2012
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.pool;
+
+import static org.junit.Assert.assertTrue;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test of lingering temporary destinations on pooled connections when the
+ * underlying connections are reused. Also tests that closing one
+ * PooledConnection does not delete the temporary destinations of another
+ * PooledConnection that uses the same underlying ConnectionPool.
+ *
+ * jira: AMQ-3457
+ */
+public class PooledConnectionTempDestCleanupTest {
+
+    @SuppressWarnings("unused")
+    private static final Logger LOG = LoggerFactory.getLogger(PooledConnectionTempDestCleanupTest.class);
+
+    protected BrokerService embeddedBroker;
+
+    protected ActiveMQConnectionFactory directConnFact;
+    protected Connection directConn1;
+    protected Connection directConn2;
+
+    protected PooledConnectionFactory pooledConnFact;
+    protected Connection pooledConn1;
+    protected Connection pooledConn2;
+
+    protected TemporaryQueue tempDest;
+    protected TemporaryQueue otherTempDest;
+
+    /**
+     * Prepare to run a test case: create, configure, and start the embedded
+     * broker, as well as creating the client connections to the broker.
+     */
+    @Before
+    public void prepTest() throws java.lang.Exception {
+        embeddedBroker = new BrokerService();
+        configureBroker(embeddedBroker);
+        embeddedBroker.start();
+        embeddedBroker.waitUntilStarted();
+
+        // Create the ActiveMQConnectionFactory and the PooledConnectionFactory.
+        directConnFact = new ActiveMQConnectionFactory(embeddedBroker.getVmConnectorURI());
+        pooledConnFact = new PooledConnectionFactory(directConnFact);
+
+        // Prepare the connections
+        directConn1 = directConnFact.createConnection();
+        directConn1.start();
+        directConn2 = directConnFact.createConnection();
+        directConn2.start();
+
+        pooledConn1 = pooledConnFact.createConnection();
+        pooledConn1.start();
+        pooledConn2 = pooledConnFact.createConnection();
+        pooledConn2.start();
+    }
+
+    @After
+    public void cleanupTest() throws java.lang.Exception {
+        try {
+            pooledConn1.stop();
+        } catch (JMSException jms_exc) {
+        }
+        try {
+            pooledConn2.stop();
+        } catch (JMSException jms_exc) {
+        }
+        try {
+            directConn1.stop();
+        } catch (JMSException jms_exc) {
+        }
+        try {
+            directConn2.stop();
+        } catch (JMSException jms_exc) {
+        }
+
+        try {
+            embeddedBroker.stop();
+        } catch (JMSException jms_exc) {
+        }
+    }
+
+    protected void configureBroker(BrokerService broker_svc) throws Exception {
+        broker_svc.setBrokerName("testbroker1");
+        broker_svc.setUseJmx(false);
+        broker_svc.setPersistent(false);
+    }
+
+    /**
+     * Test for lingering temporary destinations after closing a
+     * PooledConnection. Here are the steps:
+     *
+     * 1. create a session on the first pooled connection 2. create a session on
+     * the second pooled connection 3. create a temporary destination on the
+     * first session 4. confirm the temporary destination exists in the broker
+     * 5. close the first connection 6. check that the temporary destination no
+     * longer exists in the broker
+     */
+    @Test
+    public void testPooledLingeringTempDests() throws java.lang.Exception {
+        Session session1;
+        Session session2;
+
+        session1 = pooledConn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session2 = pooledConn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        tempDest = session1.createTemporaryQueue();
+
+        assertTrue("TEST METHOD FAILURE - NEW TEMP DESTINATION DOES NOT EXIST", destinationExists(tempDest));
+
+        pooledConn1.close();
+
+        assertTrue("FAILED: temp dest from closed pooled connection is lingering", !destinationExists(tempDest));
+
+        session2.close();
+    }
+
+    /**
+     * Test that closing one PooledConnection does not delete the temporary
+     * destinations of another.
+     *
+     * 1. create a session on the first pooled connection 2. create a session on
+     * the second pooled connection 3. create a temporary destination on the
+     * first session 4. create a temporary destination on the second session 5.
+     * confirm both temporary destinations exist in the broker 6. close the
+     * first connection 7. check that the first temporary destination no longer
+     * exists in the broker 8. check that the second temporary destination does
+     * still exist in the broker
+     */
+    @Test
+    public void testPooledTempDestsCleanupOverzealous() throws java.lang.Exception {
+        Session session1;
+        Session session2;
+
+        session1 = pooledConn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session2 = pooledConn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        tempDest = session1.createTemporaryQueue();
+        otherTempDest = session2.createTemporaryQueue();
+
+        assertTrue("TEST METHOD FAILURE - NEW TEMP DESTINATION DOES NOT EXIST", destinationExists(tempDest));
+        assertTrue("TEST METHOD FAILURE - NEW TEMP DESTINATION DOES NOT EXIST", destinationExists(otherTempDest));
+
+        pooledConn1.close();
+
+        // Now confirm the first temporary destination no longer exists and the
+        // second does.
+        assertTrue("FAILED: temp dest from closed pooled connection is lingering", !destinationExists(tempDest));
+        assertTrue("FAILED: second PooledConnectin's temporary destination was incorrectly deleted", destinationExists(otherTempDest));
+    }
+
+    /**
+     * CONTROL CASE
+     *
+     * Test for lingering temporary destinations after closing a Connection that
+     * is NOT pooled. This demonstrates the standard JMS operation and helps to
+     * validate the test methodology.
+     *
+     * 1. create a session on the first direct connection 2. create a session on
+     * the second direct connection 3. create a temporary destination on the
+     * first session 4. confirm the destination exists in the broker 5. close
+     * the first connection 6. check that the destination no longer exists in
+     * the broker
+     */
+    @Test
+    public void testDirectLingeringTempDests() throws java.lang.Exception {
+        Session session1;
+        Session session2;
+
+        session1 = directConn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session2 = directConn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        tempDest = session1.createTemporaryQueue();
+
+        assertTrue("TEST METHOD FAILURE - NEW TEMP DESTINATION DOES NOT EXIST", destinationExists(tempDest));
+
+        directConn1.close();
+
+        // Now confirm the temporary destination no longer exists.
+        assertTrue("CONTROL TEST FAILURE - TEST METHOD IS SUSPECT", (!destinationExists(tempDest)));
+
+        session2.close();
+    }
+
+    private boolean destinationExists(Destination dest) throws Exception {
+        RegionBroker rb = (RegionBroker) embeddedBroker.getBroker().getAdaptor(RegionBroker.class);
+        return rb.getTopicRegion().getDestinationMap().containsKey(dest) || rb.getQueueRegion().getDestinationMap().containsKey(dest)
+                || rb.getTempTopicRegion().getDestinationMap().containsKey(dest) || rb.getTempQueueRegion().getDestinationMap().containsKey(dest);
+    }
+}

Propchange: activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionTempDestCleanupTest.java
------------------------------------------------------------------------------
    svn:eol-style = native