You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/03/04 04:41:53 UTC
svn commit: r383014 -
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledSession.java
Author: chirino
Date: Fri Mar 3 19:41:51 2006
New Revision: 383014
URL: http://svn.apache.org/viewcvs?rev=383014&view=rev
Log:
Fixing up http://jira.activemq.org/jira/browse/AMQ-615
- We now close the consumers created by a pooled session when the session is closed.
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledSession.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledSession.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledSession.java?rev=383014&r1=383013&r2=383014&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledSession.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledSession.java Fri Mar 3 19:41:51 2006
@@ -26,6 +26,8 @@
import org.apache.commons.logging.LogFactory;
import org.apache.commons.pool.ObjectPool;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
+
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -50,6 +52,7 @@
import javax.jms.TopicSubscriber;
import java.io.Serializable;
+import java.util.Iterator;
/**
* @version $Revision: 1.1 $
@@ -63,6 +66,9 @@
private ActiveMQQueueSender queueSender;
private ActiveMQTopicPublisher topicPublisher;
private boolean transactional = true;
+
+ private final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList();
+ private final CopyOnWriteArrayList browsers = new CopyOnWriteArrayList();
public PooledSession(ActiveMQSession aSession, SessionPool sessionPool) {
this.session = aSession;
@@ -76,6 +82,17 @@
// lets reset the session
getSession().setMessageListener(null);
+
+ // Close any consumers and browsers that may have been created.
+ for (Iterator iter = consumers.iterator(); iter.hasNext();) {
+ MessageConsumer consumer = (MessageConsumer) iter.next();
+ consumer.close();
+ }
+
+ for (Iterator iter = browsers.iterator(); iter.hasNext();) {
+ QueueBrowser browser = (QueueBrowser) iter.next();
+ browser.close();
+ }
// maybe do a rollback?
if (transactional) {
@@ -182,31 +199,32 @@
// Consumer related methods
//-------------------------------------------------------------------------
public QueueBrowser createBrowser(Queue queue) throws JMSException {
- return getSession().createBrowser(queue);
+ return addQueueBrowser(getSession().createBrowser(queue));
}
public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException {
- return getSession().createBrowser(queue, selector);
+ return addQueueBrowser(getSession().createBrowser(queue, selector));
}
public MessageConsumer createConsumer(Destination destination) throws JMSException {
- return getSession().createConsumer(destination);
+ return addConsumer(getSession().createConsumer(destination));
}
public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
- return getSession().createConsumer(destination, selector);
+ return addConsumer(getSession().createConsumer(destination, selector));
}
public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
- return getSession().createConsumer(destination, selector, noLocal);
+ return addConsumer(getSession().createConsumer(destination, selector, noLocal));
}
public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException {
- return getSession().createDurableSubscriber(topic, selector);
+ return addTopicSubscriber(getSession().createDurableSubscriber(topic, selector));
}
+
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException {
- return getSession().createDurableSubscriber(topic, name, selector, noLocal);
+ return addTopicSubscriber(getSession().createDurableSubscriber(topic, name, selector, noLocal));
}
public MessageListener getMessageListener() throws JMSException {
@@ -218,19 +236,19 @@
}
public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
- return getSession().createSubscriber(topic);
+ return addTopicSubscriber(getSession().createSubscriber(topic));
}
public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException {
- return getSession().createSubscriber(topic, selector, local);
+ return addTopicSubscriber(getSession().createSubscriber(topic, selector, local));
}
public QueueReceiver createReceiver(Queue queue) throws JMSException {
- return getSession().createReceiver(queue);
+ return addQueueReceiver(getSession().createReceiver(queue));
}
public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException {
- return getSession().createReceiver(queue, selector);
+ return addQueueReceiver(getSession().createReceiver(queue, selector));
}
@@ -276,6 +294,23 @@
topicPublisher = (ActiveMQTopicPublisher) getSession().createPublisher(null);
}
return topicPublisher;
+ }
+
+ private QueueBrowser addQueueBrowser(QueueBrowser browser) {
+ browsers.add(browser);
+ return browser;
+ }
+ private MessageConsumer addConsumer(MessageConsumer consumer) {
+ consumers.add(consumer);
+ return consumer;
+ }
+ private TopicSubscriber addTopicSubscriber(TopicSubscriber subscriber) {
+ consumers.add(subscriber);
+ return subscriber;
+ }
+ private QueueReceiver addQueueReceiver(QueueReceiver receiver) {
+ consumers.add(receiver);
+ return receiver;
}
public String toString() {