You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/03/04 04:41:53 UTC

svn commit: r383014 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledSession.java

Author: chirino
Date: Fri Mar  3 19:41:51 2006
New Revision: 383014

URL: http://svn.apache.org/viewcvs?rev=383014&view=rev
Log:
Fixing up http://jira.activemq.org/jira/browse/AMQ-615
- We now close the consumers created by a pooled session when the session is closed.

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledSession.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledSession.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledSession.java?rev=383014&r1=383013&r2=383014&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledSession.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledSession.java Fri Mar  3 19:41:51 2006
@@ -26,6 +26,8 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.pool.ObjectPool;
 
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
+
 import javax.jms.BytesMessage;
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -50,6 +52,7 @@
 import javax.jms.TopicSubscriber;
 
 import java.io.Serializable;
+import java.util.Iterator;
 
 /**
  * @version $Revision: 1.1 $
@@ -63,6 +66,9 @@
     private ActiveMQQueueSender queueSender;
     private ActiveMQTopicPublisher topicPublisher;
     private boolean transactional = true;
+    
+    private final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList();
+    private final CopyOnWriteArrayList browsers = new CopyOnWriteArrayList();
 
     public PooledSession(ActiveMQSession aSession, SessionPool sessionPool) {
         this.session = aSession;
@@ -76,6 +82,17 @@
 
         // lets reset the session
         getSession().setMessageListener(null);
+        
+        // Close any consumers and browsers that may have been created.
+        for (Iterator iter = consumers.iterator(); iter.hasNext();) {
+            MessageConsumer consumer = (MessageConsumer) iter.next();
+            consumer.close();
+        }
+        
+        for (Iterator iter = browsers.iterator(); iter.hasNext();) {
+            QueueBrowser browser = (QueueBrowser) iter.next();
+            browser.close();
+        }
 
         // maybe do a rollback?
         if (transactional) {
@@ -182,31 +199,32 @@
     // Consumer related methods
     //-------------------------------------------------------------------------
     public QueueBrowser createBrowser(Queue queue) throws JMSException {
-        return getSession().createBrowser(queue);
+        return addQueueBrowser(getSession().createBrowser(queue));
     }
 
     public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException {
-        return getSession().createBrowser(queue, selector);
+        return addQueueBrowser(getSession().createBrowser(queue, selector));
     }
 
     public MessageConsumer createConsumer(Destination destination) throws JMSException {
-        return getSession().createConsumer(destination);
+        return addConsumer(getSession().createConsumer(destination));
     }
 
     public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
-        return getSession().createConsumer(destination, selector);
+        return addConsumer(getSession().createConsumer(destination, selector));
     }
 
     public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
-        return getSession().createConsumer(destination, selector, noLocal);
+        return addConsumer(getSession().createConsumer(destination, selector, noLocal));
     }
 
     public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException {
-        return getSession().createDurableSubscriber(topic, selector);
+        return addTopicSubscriber(getSession().createDurableSubscriber(topic, selector));
     }
 
+
     public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException {
-        return getSession().createDurableSubscriber(topic, name, selector, noLocal);
+        return addTopicSubscriber(getSession().createDurableSubscriber(topic, name, selector, noLocal));
     }
 
     public MessageListener getMessageListener() throws JMSException {
@@ -218,19 +236,19 @@
     }
 
     public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
-        return getSession().createSubscriber(topic);
+        return addTopicSubscriber(getSession().createSubscriber(topic));
     }
 
     public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException {
-        return getSession().createSubscriber(topic, selector, local);
+        return addTopicSubscriber(getSession().createSubscriber(topic, selector, local));
     }
 
     public QueueReceiver createReceiver(Queue queue) throws JMSException {
-        return getSession().createReceiver(queue);
+        return addQueueReceiver(getSession().createReceiver(queue));
     }
 
     public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException {
-        return getSession().createReceiver(queue, selector);
+        return addQueueReceiver(getSession().createReceiver(queue, selector));
     }
 
 
@@ -276,6 +294,23 @@
             topicPublisher = (ActiveMQTopicPublisher) getSession().createPublisher(null);
         }
         return topicPublisher;
+    }
+
+    private QueueBrowser addQueueBrowser(QueueBrowser browser) {
+        browsers.add(browser);
+        return browser;
+    }
+    private MessageConsumer addConsumer(MessageConsumer consumer) {
+        consumers.add(consumer);
+        return consumer;
+    }    
+    private TopicSubscriber addTopicSubscriber(TopicSubscriber subscriber) {
+        consumers.add(subscriber);
+        return subscriber;
+    }
+    private QueueReceiver addQueueReceiver(QueueReceiver receiver) {
+        consumers.add(receiver);
+        return receiver;
     }
 
     public String toString() {