You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by gn...@apache.org on 2014/06/12 15:00:03 UTC
svn commit: r1602148 [2/2] - in /aries/trunk/transaction/transaction-jms: ./
src/main/java/org/apache/aries/transaction/jms/
src/main/java/org/apache/aries/transaction/jms/internal/
src/main/resources/OSGI-INF/blueprint/
Modified: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSession.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSession.java?rev=1602148&r1=1602147&r2=1602148&view=diff
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSession.java (original)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSession.java Thu Jun 12 13:00:02 2014
@@ -23,34 +23,42 @@ import java.util.concurrent.CopyOnWriteA
import javax.jms.*;
import javax.transaction.xa.XAResource;
+import org.apache.commons.pool.KeyedObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class PooledSession implements Session, TopicSession, QueueSession {
+public class PooledSession implements Session, TopicSession, QueueSession, XASession {
private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);
+ private final SessionKey key;
+ private final KeyedObjectPool<SessionKey, PooledSession> sessionPool;
+ private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
+ private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
+ private final CopyOnWriteArrayList<PooledSessionEventListener> sessionEventListeners = new CopyOnWriteArrayList<PooledSessionEventListener>();
+
+ private MessageProducer producer;
+ private TopicPublisher publisher;
+ private QueueSender sender;
+
private Session session;
- private SessionPool sessionPool;
- private MessageProducer messageProducer;
- private QueueSender queueSender;
- private TopicPublisher topicPublisher;
private boolean transactional = true;
private boolean ignoreClose;
-
- private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
- private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
- private final CopyOnWriteArrayList<PooledSessionEventListener> tempDestEventListeners =
- new CopyOnWriteArrayList<PooledSessionEventListener>();
private boolean isXa;
+ private boolean useAnonymousProducers = true;
- public PooledSession(Session session, SessionPool sessionPool, boolean transactional) {
+ public PooledSession(SessionKey key, Session session, KeyedObjectPool<SessionKey, PooledSession> sessionPool, boolean transactional, boolean anonymous) {
+ this.key = key;
this.session = session;
this.sessionPool = sessionPool;
this.transactional = transactional;
+ this.useAnonymousProducers = anonymous;
}
- public void addTempDestEventListener(PooledSessionEventListener listener) {
- this.tempDestEventListeners.add(listener);
+ public void addSessionEventListener(PooledSessionEventListener listener) {
+ // only add if really needed
+ if (!sessionEventListeners.contains(listener)) {
+ this.sessionEventListeners.add(listener);
+ }
}
protected boolean isIgnoreClose() {
@@ -61,10 +69,9 @@ public class PooledSession implements Se
this.ignoreClose = ignoreClose;
}
+ @Override
public void close() throws JMSException {
if (!ignoreClose) {
- // TODO a cleaner way to reset??
-
boolean invalidate = false;
try {
// lets reset the session
@@ -95,11 +102,15 @@ public class PooledSession implements Se
} finally {
consumers.clear();
browsers.clear();
+ for (PooledSessionEventListener listener : this.sessionEventListeners) {
+ listener.onSessionClosed(this);
+ }
+ sessionEventListeners.clear();
}
if (invalidate) {
- // lets close the session and not put the session back into
- // the pool
+ // lets close the session and not put the session back into the pool
+ // instead invalidate it so the pool can create a new one on demand.
if (session != null) {
try {
session.close();
@@ -108,114 +119,145 @@ public class PooledSession implements Se
}
session = null;
}
- sessionPool.invalidateSession(this);
+ try {
+ sessionPool.invalidateObject(key, this);
+ } catch (Exception e) {
+ LOG.trace("Ignoring exception on invalidateObject as discarding session: " + e, e);
+ }
} else {
- sessionPool.returnSession(this);
+ try {
+ sessionPool.returnObject(key, this);
+ } catch (Exception e) {
+ javax.jms.IllegalStateException illegalStateException = new javax.jms.IllegalStateException(e.toString());
+ illegalStateException.initCause(e);
+ throw illegalStateException;
+ }
}
}
}
+ @Override
public void commit() throws JMSException {
getInternalSession().commit();
}
+ @Override
public BytesMessage createBytesMessage() throws JMSException {
return getInternalSession().createBytesMessage();
}
+ @Override
public MapMessage createMapMessage() throws JMSException {
return getInternalSession().createMapMessage();
}
+ @Override
public Message createMessage() throws JMSException {
return getInternalSession().createMessage();
}
+ @Override
public ObjectMessage createObjectMessage() throws JMSException {
return getInternalSession().createObjectMessage();
}
+ @Override
public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
return getInternalSession().createObjectMessage(serializable);
}
+ @Override
public Queue createQueue(String s) throws JMSException {
return getInternalSession().createQueue(s);
}
+ @Override
public StreamMessage createStreamMessage() throws JMSException {
return getInternalSession().createStreamMessage();
}
+ @Override
public TemporaryQueue createTemporaryQueue() throws JMSException {
TemporaryQueue result;
result = getInternalSession().createTemporaryQueue();
// Notify all of the listeners of the created temporary Queue.
- for (PooledSessionEventListener listener : this.tempDestEventListeners) {
+ for (PooledSessionEventListener listener : this.sessionEventListeners) {
listener.onTemporaryQueueCreate(result);
}
return result;
}
+ @Override
public TemporaryTopic createTemporaryTopic() throws JMSException {
TemporaryTopic result;
result = getInternalSession().createTemporaryTopic();
// Notify all of the listeners of the created temporary Topic.
- for (PooledSessionEventListener listener : this.tempDestEventListeners) {
+ for (PooledSessionEventListener listener : this.sessionEventListeners) {
listener.onTemporaryTopicCreate(result);
}
return result;
}
+ @Override
public void unsubscribe(String s) throws JMSException {
getInternalSession().unsubscribe(s);
}
+ @Override
public TextMessage createTextMessage() throws JMSException {
return getInternalSession().createTextMessage();
}
+ @Override
public TextMessage createTextMessage(String s) throws JMSException {
return getInternalSession().createTextMessage(s);
}
+ @Override
public Topic createTopic(String s) throws JMSException {
return getInternalSession().createTopic(s);
}
+ @Override
public int getAcknowledgeMode() throws JMSException {
return getInternalSession().getAcknowledgeMode();
}
+ @Override
public boolean getTransacted() throws JMSException {
return getInternalSession().getTransacted();
}
+ @Override
public void recover() throws JMSException {
getInternalSession().recover();
}
+ @Override
public void rollback() throws JMSException {
getInternalSession().rollback();
}
+ @Override
public XAResource getXAResource() {
- if (session == null) {
- throw new IllegalStateException("Session is closed");
+ if (session instanceof XASession) {
+ return ((XASession) session).getXAResource();
}
- return ((XASession) session).getXAResource();
+ return null;
}
+ @Override
public Session getSession() {
return this;
}
+ @Override
public void run() {
if (session != null) {
session.run();
@@ -224,112 +266,168 @@ public class PooledSession implements Se
// Consumer related methods
// -------------------------------------------------------------------------
+ @Override
public QueueBrowser createBrowser(Queue queue) throws JMSException {
return addQueueBrowser(getInternalSession().createBrowser(queue));
}
+ @Override
public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException {
return addQueueBrowser(getInternalSession().createBrowser(queue, selector));
}
+ @Override
public MessageConsumer createConsumer(Destination destination) throws JMSException {
return addConsumer(getInternalSession().createConsumer(destination));
}
+ @Override
public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
return addConsumer(getInternalSession().createConsumer(destination, selector));
}
+ @Override
public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal));
}
+ @Override
public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException {
return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, selector));
}
+ @Override
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException {
return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, name, selector, noLocal));
}
+ @Override
public MessageListener getMessageListener() throws JMSException {
return getInternalSession().getMessageListener();
}
+ @Override
public void setMessageListener(MessageListener messageListener) throws JMSException {
getInternalSession().setMessageListener(messageListener);
}
+ @Override
public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
return addTopicSubscriber(((TopicSession) getInternalSession()).createSubscriber(topic));
}
+ @Override
public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException {
return addTopicSubscriber(((TopicSession) getInternalSession()).createSubscriber(topic, selector, local));
}
+ @Override
public QueueReceiver createReceiver(Queue queue) throws JMSException {
return addQueueReceiver(((QueueSession) getInternalSession()).createReceiver(queue));
}
+ @Override
public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException {
return addQueueReceiver(((QueueSession) getInternalSession()).createReceiver(queue, selector));
}
// Producer related methods
// -------------------------------------------------------------------------
+ @Override
public MessageProducer createProducer(Destination destination) throws JMSException {
- return new PooledProducer(getMessageProducer(), destination);
+ return new PooledProducer(getMessageProducer(destination), destination);
}
+ @Override
public QueueSender createSender(Queue queue) throws JMSException {
- return new PooledQueueSender(getQueueSender(), queue);
+ return new PooledQueueSender(getQueueSender(queue), queue);
}
+ @Override
public TopicPublisher createPublisher(Topic topic) throws JMSException {
- return new PooledTopicPublisher(getTopicPublisher(), topic);
- }
-
- /**
- * Callback invoked when the consumer is closed.
- * <p/>
- * This is used to keep track of an explicit closed consumer created by this
- * session, by which we know do not need to keep track of the consumer, as
- * its already closed.
- *
- * @param consumer
- * the consumer which is being closed
- */
- protected void onConsumerClose(MessageConsumer consumer) {
- consumers.remove(consumer);
+ return new PooledTopicPublisher(getTopicPublisher(topic), topic);
}
- public Session getInternalSession() throws JMSException {
+ public Session getInternalSession() throws IllegalStateException {
if (session == null) {
- throw new JMSException("The session has already been closed");
+ throw new IllegalStateException("The session has already been closed");
}
return session;
}
public MessageProducer getMessageProducer() throws JMSException {
- if (messageProducer == null) {
- messageProducer = getInternalSession().createProducer(null);
+ return getMessageProducer(null);
+ }
+
+ public MessageProducer getMessageProducer(Destination destination) throws JMSException {
+ MessageProducer result = null;
+
+ if (useAnonymousProducers) {
+ if (producer == null) {
+ // Don't allow for duplicate anonymous producers.
+ synchronized (this) {
+ if (producer == null) {
+ producer = getInternalSession().createProducer(null);
+ }
+ }
+ }
+
+ result = producer;
+ } else {
+ result = getInternalSession().createProducer(destination);
}
- return messageProducer;
+
+ return result;
}
public QueueSender getQueueSender() throws JMSException {
- if (queueSender == null) {
- queueSender = ((QueueSession) getInternalSession()).createSender(null);
+ return getQueueSender(null);
+ }
+
+ public QueueSender getQueueSender(Queue destination) throws JMSException {
+ QueueSender result = null;
+
+ if (useAnonymousProducers) {
+ if (sender == null) {
+ // Don't allow for duplicate anonymous producers.
+ synchronized (this) {
+ if (sender == null) {
+ sender = ((QueueSession) getInternalSession()).createSender(null);
+ }
+ }
+ }
+
+ result = sender;
+ } else {
+ result = ((QueueSession) getInternalSession()).createSender(destination);
}
- return queueSender;
+
+ return result;
}
public TopicPublisher getTopicPublisher() throws JMSException {
- if (topicPublisher == null) {
- topicPublisher = ((TopicSession) getInternalSession()).createPublisher(null);
+ return getTopicPublisher(null);
+ }
+
+ public TopicPublisher getTopicPublisher(Topic destination) throws JMSException {
+ TopicPublisher result = null;
+
+ if (useAnonymousProducers) {
+ if (publisher == null) {
+ // Don't allow for duplicate anonymous producers.
+ synchronized (this) {
+ if (publisher == null) {
+ publisher = ((TopicSession) getInternalSession()).createPublisher(null);
+ }
+ }
+ }
+
+ result = publisher;
+ } else {
+ result = ((TopicSession) getInternalSession()).createPublisher(destination);
}
- return topicPublisher;
+
+ return result;
}
private QueueBrowser addQueueBrowser(QueueBrowser browser) {
@@ -340,10 +438,8 @@ public class PooledSession implements Se
private MessageConsumer addConsumer(MessageConsumer consumer) {
consumers.add(consumer);
// must wrap in PooledMessageConsumer to ensure the onConsumerClose
- // method is invoked
- // when the returned consumer is closed, to avoid memory leak in this
- // session class
- // in case many consumers is created
+ // method is invoked when the returned consumer is closed, to avoid memory
+ // leak in this session class in case many consumers is created
return new PooledMessageConsumer(this, consumer);
}
@@ -361,7 +457,22 @@ public class PooledSession implements Se
this.isXa = isXa;
}
+ @Override
public String toString() {
return "PooledSession { " + session + " }";
}
+
+ /**
+ * Callback invoked when the consumer is closed.
+ * <p/>
+ * This is used to keep track of an explicit closed consumer created by this
+ * session, by which we know do not need to keep track of the consumer, as
+ * its already closed.
+ *
+ * @param consumer
+ * the consumer which is being closed
+ */
+ protected void onConsumerClose(MessageConsumer consumer) {
+ consumers.remove(consumer);
+ }
}
Modified: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSessionEventListener.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSessionEventListener.java?rev=1602148&r1=1602147&r2=1602148&view=diff
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSessionEventListener.java (original)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSessionEventListener.java Thu Jun 12 13:00:02 2014
@@ -25,7 +25,7 @@ interface PooledSessionEventListener {
* Called on successful creation of a new TemporaryQueue.
*
* @param tempQueue
- * The TemporaryQueue just created.
+ * The TemporaryQueue just created.
*/
void onTemporaryQueueCreate(TemporaryQueue tempQueue);
@@ -33,8 +33,16 @@ interface PooledSessionEventListener {
* Called on successful creation of a new TemporaryTopic.
*
* @param tempTopic
- * The TemporaryTopic just created.
+ * The TemporaryTopic just created.
*/
void onTemporaryTopicCreate(TemporaryTopic tempTopic);
+ /**
+ * Called when the PooledSession is closed.
+ *
+ * @param session
+ * The PooledSession that has been closed.
+ */
+ void onSessionClosed(PooledSession session);
+
}
Modified: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java?rev=1602148&r1=1602147&r2=1602148&view=diff
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java (original)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java Thu Jun 12 13:00:02 2014
@@ -16,6 +16,7 @@
*/
package org.apache.aries.transaction.jms.internal;
+import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.XAConnection;
import javax.jms.XASession;
@@ -29,8 +30,8 @@ public class RecoverableConnectionPool e
private String name;
- public RecoverableConnectionPool(XAConnection connection, ObjectPoolFactory poolFactory, TransactionManager transactionManager, String name) throws JMSException {
- super(connection, poolFactory, transactionManager);
+ public RecoverableConnectionPool(Connection connection, TransactionManager transactionManager, String name) {
+ super(connection, transactionManager);
this.name = name;
}
Modified: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java?rev=1602148&r1=1602147&r2=1602148&view=diff
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java (original)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java Thu Jun 12 13:00:02 2014
@@ -22,8 +22,10 @@ package org.apache.aries.transaction.jms
*
*/
public class SessionKey {
+
private boolean transacted;
private int ackMode;
+
private int hash;
public SessionKey(boolean transacted, int ackMode) {
Modified: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaConnectionPool.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaConnectionPool.java?rev=1602148&r1=1602147&r2=1602148&view=diff
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaConnectionPool.java (original)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaConnectionPool.java Thu Jun 12 13:00:02 2014
@@ -16,8 +16,11 @@
*/
package org.apache.aries.transaction.jms.internal;
+import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
import javax.jms.XAConnection;
import javax.transaction.RollbackException;
import javax.transaction.Status;
@@ -35,28 +38,58 @@ import org.apache.commons.pool.ObjectPoo
*/
public class XaConnectionPool extends ConnectionPool {
- private TransactionManager transactionManager;
+ private final TransactionManager transactionManager;
- public XaConnectionPool(XAConnection connection, ObjectPoolFactory poolFactory, TransactionManager transactionManager) throws JMSException {
- super(connection, poolFactory);
+ public XaConnectionPool(Connection connection, TransactionManager transactionManager) {
+ super(connection);
this.transactionManager = transactionManager;
}
+ @Override
+ protected Session makeSession(SessionKey key) throws JMSException {
+ return ((XAConnection) connection).createXASession();
+ }
+
+ @Override
public Session createSession(boolean transacted, int ackMode) throws JMSException {
- PooledSession session = null;
try {
boolean isXa = (transactionManager != null && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION);
if (isXa) {
- transacted = true;
- ackMode = Session.SESSION_TRANSACTED;
- session = (PooledSession) super.createXaSession(transacted, ackMode);
+ // if the xa tx aborts inflight we don't want to auto create a
+ // local transaction or auto ack
+ transacted = false;
+ ackMode = Session.CLIENT_ACKNOWLEDGE;
+ } else if (transactionManager != null) {
+ // cmt or transactionManager managed
+ transacted = false;
+ if (ackMode == Session.SESSION_TRANSACTED) {
+ ackMode = Session.AUTO_ACKNOWLEDGE;
+ }
+ }
+ PooledSession session = (PooledSession) super.createSession(transacted, ackMode);
+ if (isXa) {
+ session.addSessionEventListener(new PooledSessionEventListener() {
+
+ @Override
+ public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
+ }
+
+ @Override
+ public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
+ }
+
+ @Override
+ public void onSessionClosed(PooledSession session) {
+ session.setIgnoreClose(true);
+ session.setIsXa(false);
+ }
+ });
session.setIgnoreClose(true);
session.setIsXa(true);
transactionManager.getTransaction().registerSynchronization(new Synchronization(session));
incrementReferenceCount();
transactionManager.getTransaction().enlistResource(createXaResource(session));
} else {
- session = (PooledSession) super.createSession(transacted, ackMode);
session.setIgnoreClose(false);
}
return session;
@@ -74,8 +107,7 @@ public class XaConnectionPool extends Co
protected XAResource createXaResource(PooledSession session) throws JMSException {
return session.getXAResource();
}
-
-
+
protected class Synchronization implements javax.transaction.Synchronization {
private final PooledSession session;
@@ -83,21 +115,20 @@ public class XaConnectionPool extends Co
this.session = session;
}
+ @Override
public void beforeCompletion() {
}
-
+
+ @Override
public void afterCompletion(int status) {
try {
// This will return session to the pool.
session.setIgnoreClose(false);
session.close();
- session.setIgnoreClose(true);
- session.setIsXa(false);
decrementReferenceCount();
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
}
-
}
Modified: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaPooledConnectionFactory.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaPooledConnectionFactory.java?rev=1602148&r1=1602147&r2=1602148&view=diff
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaPooledConnectionFactory.java (original)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaPooledConnectionFactory.java Thu Jun 12 13:00:02 2014
@@ -16,58 +16,133 @@
*/
package org.apache.aries.transaction.jms.internal;
+import java.io.Serializable;
+import java.util.Hashtable;
+
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
import javax.jms.XAConnection;
import javax.jms.XAConnectionFactory;
+import javax.naming.Binding;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.Name;
+import javax.naming.NamingEnumeration;
+import javax.naming.spi.ObjectFactory;
import javax.transaction.TransactionManager;
import org.apache.aries.transaction.jms.PooledConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A pooled connection factory that automatically enlists
* sessions in the current active XA transaction if any.
*/
-public class XaPooledConnectionFactory extends PooledConnectionFactory {
+public class XaPooledConnectionFactory extends PooledConnectionFactory implements ObjectFactory,
+ Serializable, QueueConnectionFactory, TopicConnectionFactory {
- private XAConnectionFactory xaConnectionFactory;
+ private static final transient Logger LOG = LoggerFactory.getLogger(XaPooledConnectionFactory.class);
private TransactionManager transactionManager;
-
- public XaPooledConnectionFactory() {
- super();
+ private boolean tmFromJndi = false;
+ private String tmJndiName = "java:/TransactionManager";
+
+ public TransactionManager getTransactionManager() {
+ if (transactionManager == null && tmFromJndi) {
+ try {
+ transactionManager = (TransactionManager) new InitialContext().lookup(getTmJndiName());
+ } catch (Throwable ignored) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("exception on tmFromJndi: " + getTmJndiName(), ignored);
+ }
+ }
+ }
+ return transactionManager;
}
- public XAConnectionFactory getXaConnectionFactory() {
- return xaConnectionFactory;
+ public void setTransactionManager(TransactionManager transactionManager) {
+ this.transactionManager = transactionManager;
}
- public void setXaConnectionFactory(XAConnectionFactory xaConnectionFactory) {
- this.xaConnectionFactory = xaConnectionFactory;
- setConnectionFactory(new ConnectionFactory() {
- public Connection createConnection() throws JMSException {
- return XaPooledConnectionFactory.this.xaConnectionFactory.createXAConnection();
- }
- public Connection createConnection(String userName, String password) throws JMSException {
- return XaPooledConnectionFactory.this.xaConnectionFactory.createXAConnection(userName, password);
+ @Override
+ protected ConnectionPool createConnectionPool(Connection connection) {
+ return new XaConnectionPool(connection, getTransactionManager());
+ }
+
+ @Override
+ public Object getObjectInstance(Object obj, Name name, Context nameCtx, Hashtable<?, ?> environment) throws Exception {
+ setTmFromJndi(true);
+ configFromJndiConf(obj);
+ if (environment != null) {
+ IntrospectionSupport.setProperties(this, environment);
+ }
+ return this;
+ }
+
+ private void configFromJndiConf(Object rootContextName) {
+ if (rootContextName instanceof String) {
+ String name = (String) rootContextName;
+ name = name.substring(0, name.lastIndexOf('/')) + "/conf" + name.substring(name.lastIndexOf('/'));
+ try {
+ InitialContext ctx = new InitialContext();
+ NamingEnumeration bindings = ctx.listBindings(name);
+
+ while (bindings.hasMore()) {
+ Binding bd = (Binding)bindings.next();
+ IntrospectionSupport.setProperty(this, bd.getName(), bd.getObject());
+ }
+
+ } catch (Exception ignored) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("exception on config from jndi: " + name, ignored);
+ }
}
- });
+ }
}
- public TransactionManager getTransactionManager() {
- return transactionManager;
+ public String getTmJndiName() {
+ return tmJndiName;
+ }
+
+ public void setTmJndiName(String tmJndiName) {
+ this.tmJndiName = tmJndiName;
+ }
+
+ public boolean isTmFromJndi() {
+ return tmFromJndi;
}
/**
- * The XA TransactionManager to use to enlist the JMS sessions into.
- *
- * @org.apache.xbean.Property required=true
+ * Allow transaction manager resolution from JNDI (ee deployment)
+ * @param tmFromJndi
*/
- public void setTransactionManager(TransactionManager transactionManager) {
- this.transactionManager = transactionManager;
+ public void setTmFromJndi(boolean tmFromJndi) {
+ this.tmFromJndi = tmFromJndi;
+ }
+
+ @Override
+ public QueueConnection createQueueConnection() throws JMSException {
+ return (QueueConnection) createConnection();
}
- protected ConnectionPool createConnectionPool(Connection connection) throws JMSException {
- return new XaConnectionPool((XAConnection) connection, getPoolFactory(), getTransactionManager());
+ @Override
+ public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
+ return (QueueConnection) createConnection(userName, password);
}
+
+ @Override
+ public TopicConnection createTopicConnection() throws JMSException {
+ return (TopicConnection) createConnection();
+ }
+
+ @Override
+ public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
+ return (TopicConnection) createConnection(userName, password);
+ }
+
}
Modified: aries/trunk/transaction/transaction-jms/src/main/resources/OSGI-INF/blueprint/transaction-jms.xml
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/resources/OSGI-INF/blueprint/transaction-jms.xml?rev=1602148&r1=1602147&r2=1602148&view=diff
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/resources/OSGI-INF/blueprint/transaction-jms.xml (original)
+++ aries/trunk/transaction/transaction-jms/src/main/resources/OSGI-INF/blueprint/transaction-jms.xml Thu Jun 12 13:00:02 2014
@@ -21,13 +21,13 @@ limitations under the License.
<service interface="org.apache.aries.blueprint.NamespaceHandler">
<service-properties>
- <entry key="osgi.service.blueprint.namespace" value="http://aries.apache.org/xmlns/transaction-jms/1.0"/>
+ <entry key="osgi.service.blueprint.namespace" value="http://aries.apache.org/xmlns/transaction-jms/2.0"/>
</service-properties>
<bean class="org.apache.xbean.blueprint.context.impl.XBeanNamespaceHandler">
- <argument value="http://aries.apache.org/xmlns/transaction-jms/1.0"/>
+ <argument value="http://aries.apache.org/xmlns/transaction-jms/2.0"/>
<argument value="org.apache.aries.transaction.jms.xsd"/>
<argument ref="blueprintBundle"/>
- <argument value="META-INF/services/org/apache/xbean/spring/http/aries.apache.org/xmlns/transaction-jms/1.0"/>
+ <argument value="META-INF/services/org/apache/xbean/spring/http/aries.apache.org/xmlns/transaction-jms/2.0"/>
</bean>
</service>