You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gn...@apache.org on 2009/02/03 23:10:59 UTC

svn commit: r740476 - in /activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool: JcaConnectionPool.java PooledSession.java SessionPool.java XaConnectionPool.java

Author: gnodet
Date: Tue Feb  3 22:10:59 2009
New Revision: 740476

URL: http://svn.apache.org/viewvc?rev=740476&view=rev
Log:
AMQ-2095: Make pooled sessions implement XASession for specific transaction management

Modified:
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/JcaConnectionPool.java
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/SessionPool.java
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/JcaConnectionPool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/JcaConnectionPool.java?rev=740476&r1=740475&r2=740476&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/JcaConnectionPool.java (original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/JcaConnectionPool.java Tue Feb  3 22:10:59 2009
@@ -34,7 +34,7 @@
     }
 
     protected XAResource createXaResource(PooledSession session) throws JMSException {
-        XAResource xares = new LocalAndXATransaction(session.getSession().getTransactionContext());
+        XAResource xares = new LocalAndXATransaction(session.getInternalSession().getTransactionContext());
         if (name != null) {
             xares = new WrapperNamedXAResource(xares, name);
         }

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java?rev=740476&r1=740475&r2=740476&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java (original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java Tue Feb  3 22:10:59 2009
@@ -42,6 +42,9 @@
 import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
+import javax.jms.XASession;
+import javax.jms.Session;
+import javax.transaction.xa.XAResource;
 
 import org.apache.activemq.ActiveMQMessageProducer;
 import org.apache.activemq.ActiveMQQueueSender;
@@ -54,7 +57,7 @@
 /**
  * @version $Revision: 1.1 $
  */
-public class PooledSession implements TopicSession, QueueSession {
+public class PooledSession implements Session, TopicSession, QueueSession, XASession {
     private static final transient Log LOG = LogFactory.getLog(PooledSession.class);
 
     private ActiveMQSession session;
@@ -87,7 +90,7 @@
             // TODO a cleaner way to reset??
 
             // lets reset the session
-            getSession().setMessageListener(null);
+            getInternalSession().setMessageListener(null);
 
             // Close any consumers and browsers that may have been created.
             for (Iterator<MessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
@@ -105,7 +108,7 @@
             // maybe do a rollback?
             if (transactional) {
                 try {
-                    getSession().rollback();
+                    getInternalSession().rollback();
                 } catch (JMSException e) {
                     LOG.warn("Caught exception trying rollback() when putting session back into the pool: " + e, e);
 
@@ -126,75 +129,86 @@
     }
 
     public void commit() throws JMSException {
-        getSession().commit();
+        getInternalSession().commit();
     }
 
     public BytesMessage createBytesMessage() throws JMSException {
-        return getSession().createBytesMessage();
+        return getInternalSession().createBytesMessage();
     }
 
     public MapMessage createMapMessage() throws JMSException {
-        return getSession().createMapMessage();
+        return getInternalSession().createMapMessage();
     }
 
     public Message createMessage() throws JMSException {
-        return getSession().createMessage();
+        return getInternalSession().createMessage();
     }
 
     public ObjectMessage createObjectMessage() throws JMSException {
-        return getSession().createObjectMessage();
+        return getInternalSession().createObjectMessage();
     }
 
     public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
-        return getSession().createObjectMessage(serializable);
+        return getInternalSession().createObjectMessage(serializable);
     }
 
     public Queue createQueue(String s) throws JMSException {
-        return getSession().createQueue(s);
+        return getInternalSession().createQueue(s);
     }
 
     public StreamMessage createStreamMessage() throws JMSException {
-        return getSession().createStreamMessage();
+        return getInternalSession().createStreamMessage();
     }
 
     public TemporaryQueue createTemporaryQueue() throws JMSException {
-        return getSession().createTemporaryQueue();
+        return getInternalSession().createTemporaryQueue();
     }
 
     public TemporaryTopic createTemporaryTopic() throws JMSException {
-        return getSession().createTemporaryTopic();
+        return getInternalSession().createTemporaryTopic();
     }
 
     public void unsubscribe(String s) throws JMSException {
-        getSession().unsubscribe(s);
+        getInternalSession().unsubscribe(s);
     }
 
     public TextMessage createTextMessage() throws JMSException {
-        return getSession().createTextMessage();
+        return getInternalSession().createTextMessage();
     }
 
     public TextMessage createTextMessage(String s) throws JMSException {
-        return getSession().createTextMessage(s);
+        return getInternalSession().createTextMessage(s);
     }
 
     public Topic createTopic(String s) throws JMSException {
-        return getSession().createTopic(s);
+        return getInternalSession().createTopic(s);
     }
 
     public int getAcknowledgeMode() throws JMSException {
-        return getSession().getAcknowledgeMode();
+        return getInternalSession().getAcknowledgeMode();
     }
 
     public boolean getTransacted() throws JMSException {
-        return getSession().getTransacted();
+        return getInternalSession().getTransacted();
     }
 
     public void recover() throws JMSException {
-        getSession().recover();
+        getInternalSession().recover();
     }
 
     public void rollback() throws JMSException {
-        getSession().rollback();
+        getInternalSession().rollback();
+    }
+
+    public XAResource getXAResource() {
+        if (session == null) {
+            throw new IllegalStateException("Session is closed");
+        }
+        return session.getTransactionContext();
+    }
+
+    public Session getSession() {
+        return this;
     }
 
     public void run() {
@@ -206,55 +220,55 @@
     // Consumer related methods
     // -------------------------------------------------------------------------
     public QueueBrowser createBrowser(Queue queue) throws JMSException {
-        return addQueueBrowser(getSession().createBrowser(queue));
+        return addQueueBrowser(getInternalSession().createBrowser(queue));
     }
 
     public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException {
-        return addQueueBrowser(getSession().createBrowser(queue, selector));
+        return addQueueBrowser(getInternalSession().createBrowser(queue, selector));
     }
 
     public MessageConsumer createConsumer(Destination destination) throws JMSException {
-        return addConsumer(getSession().createConsumer(destination));
+        return addConsumer(getInternalSession().createConsumer(destination));
     }
 
     public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
-        return addConsumer(getSession().createConsumer(destination, selector));
+        return addConsumer(getInternalSession().createConsumer(destination, selector));
     }
 
     public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
-        return addConsumer(getSession().createConsumer(destination, selector, noLocal));
+        return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal));
     }
 
     public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException {
-        return addTopicSubscriber(getSession().createDurableSubscriber(topic, selector));
+        return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, selector));
     }
 
     public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException {
-        return addTopicSubscriber(getSession().createDurableSubscriber(topic, name, selector, noLocal));
+        return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, name, selector, noLocal));
     }
 
     public MessageListener getMessageListener() throws JMSException {
-        return getSession().getMessageListener();
+        return getInternalSession().getMessageListener();
     }
 
     public void setMessageListener(MessageListener messageListener) throws JMSException {
-        getSession().setMessageListener(messageListener);
+        getInternalSession().setMessageListener(messageListener);
     }
 
     public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
-        return addTopicSubscriber(getSession().createSubscriber(topic));
+        return addTopicSubscriber(getInternalSession().createSubscriber(topic));
     }
 
     public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException {
-        return addTopicSubscriber(getSession().createSubscriber(topic, selector, local));
+        return addTopicSubscriber(getInternalSession().createSubscriber(topic, selector, local));
     }
 
     public QueueReceiver createReceiver(Queue queue) throws JMSException {
-        return addQueueReceiver(getSession().createReceiver(queue));
+        return addQueueReceiver(getInternalSession().createReceiver(queue));
     }
 
     public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException {
-        return addQueueReceiver(getSession().createReceiver(queue, selector));
+        return addQueueReceiver(getInternalSession().createReceiver(queue, selector));
     }
 
     // Producer related methods
@@ -273,7 +287,7 @@
 
     // Implementation methods
     // -------------------------------------------------------------------------
-    protected ActiveMQSession getSession() throws AlreadyClosedException {
+    protected ActiveMQSession getInternalSession() throws AlreadyClosedException {
         if (session == null) {
             throw new AlreadyClosedException("The session has already been closed");
         }
@@ -282,21 +296,21 @@
 
     public ActiveMQMessageProducer getMessageProducer() throws JMSException {
         if (messageProducer == null) {
-            messageProducer = (ActiveMQMessageProducer)getSession().createProducer(null);
+            messageProducer = (ActiveMQMessageProducer) getInternalSession().createProducer(null);
         }
         return messageProducer;
     }
 
     public ActiveMQQueueSender getQueueSender() throws JMSException {
         if (queueSender == null) {
-            queueSender = (ActiveMQQueueSender)getSession().createSender(null);
+            queueSender = (ActiveMQQueueSender) getInternalSession().createSender(null);
         }
         return queueSender;
     }
 
     public ActiveMQTopicPublisher getTopicPublisher() throws JMSException {
         if (topicPublisher == null) {
-            topicPublisher = (ActiveMQTopicPublisher)getSession().createPublisher(null);
+            topicPublisher = (ActiveMQTopicPublisher) getInternalSession().createPublisher(null);
         }
         return topicPublisher;
     }

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/SessionPool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/SessionPool.java?rev=740476&r1=740475&r2=740476&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/SessionPool.java (original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/SessionPool.java Tue Feb  3 22:10:59 2009
@@ -78,7 +78,7 @@
 
     public void destroyObject(Object o) throws Exception {
         PooledSession session = (PooledSession)o;
-        session.getSession().close();
+        session.getInternalSession().close();
     }
 
     public boolean validateObject(Object o) {

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java?rev=740476&r1=740475&r2=740476&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java (original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java Tue Feb  3 22:10:59 2009
@@ -69,7 +69,7 @@
     }
 
     protected XAResource createXaResource(PooledSession session) throws JMSException {
-        return session.getSession().getTransactionContext();
+        return session.getXAResource();
     }