You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by gn...@apache.org on 2014/06/12 15:00:03 UTC

svn commit: r1602148 [2/2] - in /aries/trunk/transaction/transaction-jms: ./ src/main/java/org/apache/aries/transaction/jms/ src/main/java/org/apache/aries/transaction/jms/internal/ src/main/resources/OSGI-INF/blueprint/

Modified: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSession.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSession.java?rev=1602148&r1=1602147&r2=1602148&view=diff
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSession.java (original)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSession.java Thu Jun 12 13:00:02 2014
@@ -23,34 +23,42 @@ import java.util.concurrent.CopyOnWriteA
 import javax.jms.*;
 import javax.transaction.xa.XAResource;
 
+import org.apache.commons.pool.KeyedObjectPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PooledSession implements Session, TopicSession, QueueSession {
+public class PooledSession implements Session, TopicSession, QueueSession, XASession {
     private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);
 
+    private final SessionKey key;
+    private final KeyedObjectPool<SessionKey, PooledSession> sessionPool;
+    private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
+    private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
+    private final CopyOnWriteArrayList<PooledSessionEventListener> sessionEventListeners = new CopyOnWriteArrayList<PooledSessionEventListener>();
+
+    private MessageProducer producer;
+    private TopicPublisher publisher;
+    private QueueSender sender;
+
     private Session session;
-    private SessionPool sessionPool;
-    private MessageProducer messageProducer;
-    private QueueSender queueSender;
-    private TopicPublisher topicPublisher;
     private boolean transactional = true;
     private boolean ignoreClose;
-
-    private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
-    private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
-    private final CopyOnWriteArrayList<PooledSessionEventListener> tempDestEventListeners =
-        new CopyOnWriteArrayList<PooledSessionEventListener>();
     private boolean isXa;
+    private boolean useAnonymousProducers = true;
 
-    public PooledSession(Session session, SessionPool sessionPool, boolean transactional) {
+    public PooledSession(SessionKey key, Session session, KeyedObjectPool<SessionKey, PooledSession> sessionPool, boolean transactional, boolean anonymous) {
+        this.key = key;
         this.session = session;
         this.sessionPool = sessionPool;
         this.transactional = transactional;
+        this.useAnonymousProducers = anonymous;
     }
 
-    public void addTempDestEventListener(PooledSessionEventListener listener) {
-        this.tempDestEventListeners.add(listener);
+    public void addSessionEventListener(PooledSessionEventListener listener) {
+        // only add if really needed
+        if (!sessionEventListeners.contains(listener)) {
+            this.sessionEventListeners.add(listener);
+        }
     }
 
     protected boolean isIgnoreClose() {
@@ -61,10 +69,9 @@ public class PooledSession implements Se
         this.ignoreClose = ignoreClose;
     }
 
+    @Override
     public void close() throws JMSException {
         if (!ignoreClose) {
-            // TODO a cleaner way to reset??
-
             boolean invalidate = false;
             try {
                 // lets reset the session
@@ -95,11 +102,15 @@ public class PooledSession implements Se
             } finally {
                 consumers.clear();
                 browsers.clear();
+                for (PooledSessionEventListener listener : this.sessionEventListeners) {
+                    listener.onSessionClosed(this);
+                }
+                sessionEventListeners.clear();
             }
 
             if (invalidate) {
-                // lets close the session and not put the session back into
-                // the pool
+                // lets close the session and not put the session back into the pool
+                // instead invalidate it so the pool can create a new one on demand.
                 if (session != null) {
                     try {
                         session.close();
@@ -108,114 +119,145 @@ public class PooledSession implements Se
                     }
                     session = null;
                 }
-                sessionPool.invalidateSession(this);
+                try {
+                    sessionPool.invalidateObject(key, this);
+                } catch (Exception e) {
+                    LOG.trace("Ignoring exception on invalidateObject as discarding session: " + e, e);
+                }
             } else {
-                sessionPool.returnSession(this);
+                try {
+                    sessionPool.returnObject(key, this);
+                } catch (Exception e) {
+                    javax.jms.IllegalStateException illegalStateException = new javax.jms.IllegalStateException(e.toString());
+                    illegalStateException.initCause(e);
+                    throw illegalStateException;
+                }
             }
         }
     }
 
+    @Override
     public void commit() throws JMSException {
         getInternalSession().commit();
     }
 
+    @Override
     public BytesMessage createBytesMessage() throws JMSException {
         return getInternalSession().createBytesMessage();
     }
 
+    @Override
     public MapMessage createMapMessage() throws JMSException {
         return getInternalSession().createMapMessage();
     }
 
+    @Override
     public Message createMessage() throws JMSException {
         return getInternalSession().createMessage();
     }
 
+    @Override
     public ObjectMessage createObjectMessage() throws JMSException {
         return getInternalSession().createObjectMessage();
     }
 
+    @Override
     public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
         return getInternalSession().createObjectMessage(serializable);
     }
 
+    @Override
     public Queue createQueue(String s) throws JMSException {
         return getInternalSession().createQueue(s);
     }
 
+    @Override
     public StreamMessage createStreamMessage() throws JMSException {
         return getInternalSession().createStreamMessage();
     }
 
+    @Override
     public TemporaryQueue createTemporaryQueue() throws JMSException {
         TemporaryQueue result;
 
         result = getInternalSession().createTemporaryQueue();
 
         // Notify all of the listeners of the created temporary Queue.
-        for (PooledSessionEventListener listener : this.tempDestEventListeners) {
+        for (PooledSessionEventListener listener : this.sessionEventListeners) {
             listener.onTemporaryQueueCreate(result);
         }
 
         return result;
     }
 
+    @Override
     public TemporaryTopic createTemporaryTopic() throws JMSException {
         TemporaryTopic result;
 
         result = getInternalSession().createTemporaryTopic();
 
         // Notify all of the listeners of the created temporary Topic.
-        for (PooledSessionEventListener listener : this.tempDestEventListeners) {
+        for (PooledSessionEventListener listener : this.sessionEventListeners) {
             listener.onTemporaryTopicCreate(result);
         }
 
         return result;
     }
 
+    @Override
     public void unsubscribe(String s) throws JMSException {
         getInternalSession().unsubscribe(s);
     }
 
+    @Override
     public TextMessage createTextMessage() throws JMSException {
         return getInternalSession().createTextMessage();
     }
 
+    @Override
     public TextMessage createTextMessage(String s) throws JMSException {
         return getInternalSession().createTextMessage(s);
     }
 
+    @Override
     public Topic createTopic(String s) throws JMSException {
         return getInternalSession().createTopic(s);
     }
 
+    @Override
     public int getAcknowledgeMode() throws JMSException {
         return getInternalSession().getAcknowledgeMode();
     }
 
+    @Override
     public boolean getTransacted() throws JMSException {
         return getInternalSession().getTransacted();
     }
 
+    @Override
     public void recover() throws JMSException {
         getInternalSession().recover();
     }
 
+    @Override
     public void rollback() throws JMSException {
         getInternalSession().rollback();
     }
 
+    @Override
     public XAResource getXAResource() {
-        if (session == null) {
-            throw new IllegalStateException("Session is closed");
+        if (session instanceof XASession) {
+            return ((XASession) session).getXAResource();
         }
-        return ((XASession) session).getXAResource();
+        return null;
     }
 
+    @Override
     public Session getSession() {
         return this;
     }
 
+    @Override
     public void run() {
         if (session != null) {
             session.run();
@@ -224,112 +266,168 @@ public class PooledSession implements Se
 
     // Consumer related methods
     // -------------------------------------------------------------------------
+    @Override
     public QueueBrowser createBrowser(Queue queue) throws JMSException {
         return addQueueBrowser(getInternalSession().createBrowser(queue));
     }
 
+    @Override
     public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException {
         return addQueueBrowser(getInternalSession().createBrowser(queue, selector));
     }
 
+    @Override
     public MessageConsumer createConsumer(Destination destination) throws JMSException {
         return addConsumer(getInternalSession().createConsumer(destination));
     }
 
+    @Override
     public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
         return addConsumer(getInternalSession().createConsumer(destination, selector));
     }
 
+    @Override
     public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
         return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal));
     }
 
+    @Override
     public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException {
         return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, selector));
     }
 
+    @Override
     public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException {
         return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, name, selector, noLocal));
     }
 
+    @Override
     public MessageListener getMessageListener() throws JMSException {
         return getInternalSession().getMessageListener();
     }
 
+    @Override
     public void setMessageListener(MessageListener messageListener) throws JMSException {
         getInternalSession().setMessageListener(messageListener);
     }
 
+    @Override
     public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
         return addTopicSubscriber(((TopicSession) getInternalSession()).createSubscriber(topic));
     }
 
+    @Override
     public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException {
         return addTopicSubscriber(((TopicSession) getInternalSession()).createSubscriber(topic, selector, local));
     }
 
+    @Override
     public QueueReceiver createReceiver(Queue queue) throws JMSException {
         return addQueueReceiver(((QueueSession) getInternalSession()).createReceiver(queue));
     }
 
+    @Override
     public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException {
         return addQueueReceiver(((QueueSession) getInternalSession()).createReceiver(queue, selector));
     }
 
     // Producer related methods
     // -------------------------------------------------------------------------
+    @Override
     public MessageProducer createProducer(Destination destination) throws JMSException {
-        return new PooledProducer(getMessageProducer(), destination);
+        return new PooledProducer(getMessageProducer(destination), destination);
     }
 
+    @Override
     public QueueSender createSender(Queue queue) throws JMSException {
-        return new PooledQueueSender(getQueueSender(), queue);
+        return new PooledQueueSender(getQueueSender(queue), queue);
     }
 
+    @Override
     public TopicPublisher createPublisher(Topic topic) throws JMSException {
-        return new PooledTopicPublisher(getTopicPublisher(), topic);
-    }
-
-    /**
-     * Callback invoked when the consumer is closed.
-     * <p/>
-     * This is used to keep track of an explicit closed consumer created by this
-     * session, by which we know do not need to keep track of the consumer, as
-     * its already closed.
-     *
-     * @param consumer
-     *            the consumer which is being closed
-     */
-    protected void onConsumerClose(MessageConsumer consumer) {
-        consumers.remove(consumer);
+        return new PooledTopicPublisher(getTopicPublisher(topic), topic);
     }
 
-    public Session getInternalSession() throws JMSException {
+    public Session getInternalSession() throws IllegalStateException {
         if (session == null) {
-            throw new JMSException("The session has already been closed");
+            throw new IllegalStateException("The session has already been closed");
         }
         return session;
     }
 
     public MessageProducer getMessageProducer() throws JMSException {
-        if (messageProducer == null) {
-            messageProducer = getInternalSession().createProducer(null);
+        return getMessageProducer(null);
+    }
+
+    public MessageProducer getMessageProducer(Destination destination) throws JMSException {
+        MessageProducer result = null;
+
+        if (useAnonymousProducers) {
+            if (producer == null) {
+                // Don't allow for duplicate anonymous producers.
+                synchronized (this) {
+                    if (producer == null) {
+                        producer = getInternalSession().createProducer(null);
+                    }
+                }
+            }
+
+            result = producer;
+        } else {
+            result = getInternalSession().createProducer(destination);
         }
-        return messageProducer;
+
+        return result;
     }
 
     public QueueSender getQueueSender() throws JMSException {
-        if (queueSender == null) {
-            queueSender = ((QueueSession) getInternalSession()).createSender(null);
+        return getQueueSender(null);
+    }
+
+    public QueueSender getQueueSender(Queue destination) throws JMSException {
+        QueueSender result = null;
+
+        if (useAnonymousProducers) {
+            if (sender == null) {
+                // Don't allow for duplicate anonymous producers.
+                synchronized (this) {
+                    if (sender == null) {
+                        sender = ((QueueSession) getInternalSession()).createSender(null);
+                    }
+                }
+            }
+
+            result = sender;
+        } else {
+            result = ((QueueSession) getInternalSession()).createSender(destination);
         }
-        return queueSender;
+
+        return result;
     }
 
     public TopicPublisher getTopicPublisher() throws JMSException {
-        if (topicPublisher == null) {
-            topicPublisher = ((TopicSession) getInternalSession()).createPublisher(null);
+        return getTopicPublisher(null);
+    }
+
+    public TopicPublisher getTopicPublisher(Topic destination) throws JMSException {
+        TopicPublisher result = null;
+
+        if (useAnonymousProducers) {
+            if (publisher == null) {
+                // Don't allow for duplicate anonymous producers.
+                synchronized (this) {
+                    if (publisher == null) {
+                        publisher = ((TopicSession) getInternalSession()).createPublisher(null);
+                    }
+                }
+            }
+
+            result = publisher;
+        } else {
+            result = ((TopicSession) getInternalSession()).createPublisher(destination);
         }
-        return topicPublisher;
+
+        return result;
     }
 
     private QueueBrowser addQueueBrowser(QueueBrowser browser) {
@@ -340,10 +438,8 @@ public class PooledSession implements Se
     private MessageConsumer addConsumer(MessageConsumer consumer) {
         consumers.add(consumer);
         // must wrap in PooledMessageConsumer to ensure the onConsumerClose
-        // method is invoked
-        // when the returned consumer is closed, to avoid memory leak in this
-        // session class
-        // in case many consumers is created
+        // method is invoked when the returned consumer is closed, to avoid memory
+        // leak in this session class in case many consumers is created
         return new PooledMessageConsumer(this, consumer);
     }
 
@@ -361,7 +457,22 @@ public class PooledSession implements Se
         this.isXa = isXa;
     }
 
+    @Override
     public String toString() {
         return "PooledSession { " + session + " }";
     }
+
+    /**
+     * Callback invoked when the consumer is closed.
+     * <p/>
+     * This is used to keep track of an explicit closed consumer created by this
+     * session, by which we know do not need to keep track of the consumer, as
+     * its already closed.
+     *
+     * @param consumer
+     *            the consumer which is being closed
+     */
+    protected void onConsumerClose(MessageConsumer consumer) {
+        consumers.remove(consumer);
+    }
 }

Modified: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSessionEventListener.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSessionEventListener.java?rev=1602148&r1=1602147&r2=1602148&view=diff
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSessionEventListener.java (original)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSessionEventListener.java Thu Jun 12 13:00:02 2014
@@ -25,7 +25,7 @@ interface PooledSessionEventListener {
      * Called on successful creation of a new TemporaryQueue.
      *
      * @param tempQueue
-     *            The TemporaryQueue just created.
+     *      The TemporaryQueue just created.
      */
     void onTemporaryQueueCreate(TemporaryQueue tempQueue);
 
@@ -33,8 +33,16 @@ interface PooledSessionEventListener {
      * Called on successful creation of a new TemporaryTopic.
      *
      * @param tempTopic
-     *            The TemporaryTopic just created.
+     *      The TemporaryTopic just created.
      */
     void onTemporaryTopicCreate(TemporaryTopic tempTopic);
 
+    /**
+     * Called when the PooledSession is closed.
+     *
+     * @param session
+     *      The PooledSession that has been closed.
+     */
+    void onSessionClosed(PooledSession session);
+
 }

Modified: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java?rev=1602148&r1=1602147&r2=1602148&view=diff
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java (original)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java Thu Jun 12 13:00:02 2014
@@ -16,6 +16,7 @@
  */
 package org.apache.aries.transaction.jms.internal;
 
+import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.XAConnection;
 import javax.jms.XASession;
@@ -29,8 +30,8 @@ public class RecoverableConnectionPool e
 
     private String name;
 
-    public RecoverableConnectionPool(XAConnection connection, ObjectPoolFactory poolFactory, TransactionManager transactionManager, String name) throws JMSException {
-        super(connection, poolFactory, transactionManager);
+    public RecoverableConnectionPool(Connection connection, TransactionManager transactionManager, String name) {
+        super(connection, transactionManager);
         this.name = name;
     }
 

Modified: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java?rev=1602148&r1=1602147&r2=1602148&view=diff
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java (original)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java Thu Jun 12 13:00:02 2014
@@ -22,8 +22,10 @@ package org.apache.aries.transaction.jms
  * 
  */
 public class SessionKey {
+
     private boolean transacted;
     private int ackMode;
+
     private int hash;
 
     public SessionKey(boolean transacted, int ackMode) {

Modified: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaConnectionPool.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaConnectionPool.java?rev=1602148&r1=1602147&r2=1602148&view=diff
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaConnectionPool.java (original)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaConnectionPool.java Thu Jun 12 13:00:02 2014
@@ -16,8 +16,11 @@
  */
 package org.apache.aries.transaction.jms.internal;
 
+import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
 import javax.jms.XAConnection;
 import javax.transaction.RollbackException;
 import javax.transaction.Status;
@@ -35,28 +38,58 @@ import org.apache.commons.pool.ObjectPoo
  */
 public class XaConnectionPool extends ConnectionPool {
 
-    private TransactionManager transactionManager;
+    private final TransactionManager transactionManager;
 
-    public XaConnectionPool(XAConnection connection, ObjectPoolFactory poolFactory, TransactionManager transactionManager) throws JMSException {
-        super(connection, poolFactory);
+    public XaConnectionPool(Connection connection, TransactionManager transactionManager) {
+        super(connection);
         this.transactionManager = transactionManager;
     }
 
+    @Override
+    protected Session makeSession(SessionKey key) throws JMSException {
+        return ((XAConnection) connection).createXASession();
+    }
+
+    @Override
     public Session createSession(boolean transacted, int ackMode) throws JMSException {
-    	PooledSession session = null;
         try {
             boolean isXa = (transactionManager != null && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION);
             if (isXa) {
-                transacted = true;
-                ackMode = Session.SESSION_TRANSACTED;
-                session = (PooledSession) super.createXaSession(transacted, ackMode);
+                // if the xa tx aborts inflight we don't want to auto create a
+                // local transaction or auto ack
+                transacted = false;
+                ackMode = Session.CLIENT_ACKNOWLEDGE;
+            } else if (transactionManager != null) {
+                // cmt or transactionManager managed
+                transacted = false;
+                if (ackMode == Session.SESSION_TRANSACTED) {
+                    ackMode = Session.AUTO_ACKNOWLEDGE;
+                }
+            }
+            PooledSession session = (PooledSession) super.createSession(transacted, ackMode);
+            if (isXa) {
+                session.addSessionEventListener(new PooledSessionEventListener() {
+
+                    @Override
+                    public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
+                    }
+
+                    @Override
+                    public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
+                    }
+
+                    @Override
+                    public void onSessionClosed(PooledSession session) {
+                        session.setIgnoreClose(true);
+                        session.setIsXa(false);
+                    }
+                });
                 session.setIgnoreClose(true);
                 session.setIsXa(true);
                 transactionManager.getTransaction().registerSynchronization(new Synchronization(session));
                 incrementReferenceCount();
                 transactionManager.getTransaction().enlistResource(createXaResource(session));
             } else {
-            	session = (PooledSession) super.createSession(transacted, ackMode);
                 session.setIgnoreClose(false);
             }
             return session;
@@ -74,8 +107,7 @@ public class XaConnectionPool extends Co
     protected XAResource createXaResource(PooledSession session) throws JMSException {
         return session.getXAResource();
     }
-    
-    
+
     protected class Synchronization implements javax.transaction.Synchronization {
         private final PooledSession session;
 
@@ -83,21 +115,20 @@ public class XaConnectionPool extends Co
             this.session = session;
         }
 
+        @Override
         public void beforeCompletion() {
         }
-        
+
+        @Override
         public void afterCompletion(int status) {
             try {
                 // This will return session to the pool.
                 session.setIgnoreClose(false);
                 session.close();
-                session.setIgnoreClose(true);
-                session.setIsXa(false);
                 decrementReferenceCount();
             } catch (JMSException e) {
                 throw new RuntimeException(e);
             }
         }
     }
-    
 }

Modified: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaPooledConnectionFactory.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaPooledConnectionFactory.java?rev=1602148&r1=1602147&r2=1602148&view=diff
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaPooledConnectionFactory.java (original)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaPooledConnectionFactory.java Thu Jun 12 13:00:02 2014
@@ -16,58 +16,133 @@
  */
 package org.apache.aries.transaction.jms.internal;
 
+import java.io.Serializable;
+import java.util.Hashtable;
+
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
 import javax.jms.XAConnection;
 import javax.jms.XAConnectionFactory;
+import javax.naming.Binding;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.Name;
+import javax.naming.NamingEnumeration;
+import javax.naming.spi.ObjectFactory;
 import javax.transaction.TransactionManager;
 
 import org.apache.aries.transaction.jms.PooledConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A pooled connection factory that automatically enlists
  * sessions in the current active XA transaction if any.
  */
-public class XaPooledConnectionFactory extends PooledConnectionFactory {
+public class XaPooledConnectionFactory extends PooledConnectionFactory implements ObjectFactory,
+        Serializable, QueueConnectionFactory, TopicConnectionFactory {
 
-    private XAConnectionFactory xaConnectionFactory;
+    private static final transient Logger LOG = LoggerFactory.getLogger(XaPooledConnectionFactory.class);
     private TransactionManager transactionManager;
-    
-    public XaPooledConnectionFactory() {
-        super();
+    private boolean tmFromJndi = false;
+    private String tmJndiName = "java:/TransactionManager";
+
+    public TransactionManager getTransactionManager() {
+        if (transactionManager == null && tmFromJndi) {
+            try {
+                transactionManager = (TransactionManager) new InitialContext().lookup(getTmJndiName());
+            } catch (Throwable ignored) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("exception on tmFromJndi: " + getTmJndiName(), ignored);
+                }
+            }
+        }
+        return transactionManager;
     }
 
-    public XAConnectionFactory getXaConnectionFactory() {
-        return xaConnectionFactory;
+    public void setTransactionManager(TransactionManager transactionManager) {
+        this.transactionManager = transactionManager;
     }
 
-    public void setXaConnectionFactory(XAConnectionFactory xaConnectionFactory) {
-    	this.xaConnectionFactory = xaConnectionFactory;
-        setConnectionFactory(new ConnectionFactory() {
-            public Connection createConnection() throws JMSException {
-                return XaPooledConnectionFactory.this.xaConnectionFactory.createXAConnection();
-            }
-            public Connection createConnection(String userName, String password) throws JMSException {
-                return XaPooledConnectionFactory.this.xaConnectionFactory.createXAConnection(userName, password);
+    @Override
+    protected ConnectionPool createConnectionPool(Connection connection) {
+        return new XaConnectionPool(connection, getTransactionManager());
+    }
+
+    @Override
+    public Object getObjectInstance(Object obj, Name name, Context nameCtx, Hashtable<?, ?> environment) throws Exception {
+        setTmFromJndi(true);
+        configFromJndiConf(obj);
+        if (environment != null) {
+            IntrospectionSupport.setProperties(this, environment);
+        }
+        return this;
+    }
+
+    private void configFromJndiConf(Object rootContextName) {
+        if (rootContextName instanceof String) {
+            String name = (String) rootContextName;
+            name = name.substring(0, name.lastIndexOf('/')) + "/conf" + name.substring(name.lastIndexOf('/'));
+            try {
+                InitialContext ctx = new InitialContext();
+                NamingEnumeration bindings = ctx.listBindings(name);
+
+                while (bindings.hasMore()) {
+                    Binding bd = (Binding)bindings.next();
+                    IntrospectionSupport.setProperty(this, bd.getName(), bd.getObject());
+                }
+
+            } catch (Exception ignored) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("exception on config from jndi: " + name, ignored);
+                }
             }
-        });
+        }
     }
 
-    public TransactionManager getTransactionManager() {
-        return transactionManager;
+    public String getTmJndiName() {
+        return tmJndiName;
+    }
+
+    public void setTmJndiName(String tmJndiName) {
+        this.tmJndiName = tmJndiName;
+    }
+
+    public boolean isTmFromJndi() {
+        return tmFromJndi;
     }
 
     /**
-     * The XA TransactionManager to use to enlist the JMS sessions into.
-     *
-     * @org.apache.xbean.Property required=true
+     * Allow transaction manager resolution from JNDI (ee deployment)
+     * @param tmFromJndi
      */
-    public void setTransactionManager(TransactionManager transactionManager) {
-        this.transactionManager = transactionManager;
+    public void setTmFromJndi(boolean tmFromJndi) {
+        this.tmFromJndi = tmFromJndi;
+    }
+
+    @Override
+    public QueueConnection createQueueConnection() throws JMSException {
+        return (QueueConnection) createConnection();
     }
 
-    protected ConnectionPool createConnectionPool(Connection connection) throws JMSException {
-        return new XaConnectionPool((XAConnection) connection, getPoolFactory(), getTransactionManager());
+    @Override
+    public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
+        return (QueueConnection) createConnection(userName, password);
     }
+
+    @Override
+    public TopicConnection createTopicConnection() throws JMSException {
+        return (TopicConnection) createConnection();
+    }
+
+    @Override
+    public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
+        return (TopicConnection) createConnection(userName, password);
+    }
+
 }

Modified: aries/trunk/transaction/transaction-jms/src/main/resources/OSGI-INF/blueprint/transaction-jms.xml
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/resources/OSGI-INF/blueprint/transaction-jms.xml?rev=1602148&r1=1602147&r2=1602148&view=diff
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/resources/OSGI-INF/blueprint/transaction-jms.xml (original)
+++ aries/trunk/transaction/transaction-jms/src/main/resources/OSGI-INF/blueprint/transaction-jms.xml Thu Jun 12 13:00:02 2014
@@ -21,13 +21,13 @@ limitations under the License.
 
     <service interface="org.apache.aries.blueprint.NamespaceHandler">
         <service-properties>
-            <entry key="osgi.service.blueprint.namespace" value="http://aries.apache.org/xmlns/transaction-jms/1.0"/>
+            <entry key="osgi.service.blueprint.namespace" value="http://aries.apache.org/xmlns/transaction-jms/2.0"/>
         </service-properties>
         <bean class="org.apache.xbean.blueprint.context.impl.XBeanNamespaceHandler">
-            <argument value="http://aries.apache.org/xmlns/transaction-jms/1.0"/>
+            <argument value="http://aries.apache.org/xmlns/transaction-jms/2.0"/>
             <argument value="org.apache.aries.transaction.jms.xsd"/>
             <argument ref="blueprintBundle"/>
-            <argument value="META-INF/services/org/apache/xbean/spring/http/aries.apache.org/xmlns/transaction-jms/1.0"/>
+            <argument value="META-INF/services/org/apache/xbean/spring/http/aries.apache.org/xmlns/transaction-jms/2.0"/>
         </bean>
     </service>