You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by gn...@apache.org on 2012/07/31 19:08:05 UTC
svn commit: r1367676 [2/2] - in /aries/trunk/transaction: ./
transaction-jms/ transaction-jms/src/ transaction-jms/src/main/
transaction-jms/src/main/java/ transaction-jms/src/main/java/org/
transaction-jms/src/main/java/org/apache/ transaction-jms/src...
Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledTopicPublisher.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledTopicPublisher.java?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledTopicPublisher.java (added)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledTopicPublisher.java Tue Jul 31 17:08:04 2012
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.transaction.jms.internal;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+
+/**
+ *
+ */
+public class PooledTopicPublisher extends PooledProducer implements TopicPublisher {
+
+ public PooledTopicPublisher(TopicPublisher messageProducer, Destination destination) throws JMSException {
+ super(messageProducer, destination);
+ }
+
+ public Topic getTopic() throws JMSException {
+ return getTopicPublisher().getTopic();
+ }
+
+ public void publish(Message message) throws JMSException {
+ getTopicPublisher().publish((Topic) getDestination(), message);
+ }
+
+ public void publish(Message message, int i, int i1, long l) throws JMSException {
+ getTopicPublisher().publish((Topic) getDestination(), message, i, i1, l);
+ }
+
+ public void publish(Topic topic, Message message) throws JMSException {
+ getTopicPublisher().publish(topic, message);
+ }
+
+ public void publish(Topic topic, Message message, int i, int i1, long l) throws JMSException {
+ getTopicPublisher().publish(topic, message, i, i1, l);
+ }
+
+ protected TopicPublisher getTopicPublisher() {
+ return (TopicPublisher) getMessageProducer();
+ }
+}
Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java (added)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java Tue Jul 31 17:08:04 2012
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.transaction.jms.internal;
+
+import javax.jms.JMSException;
+import javax.jms.XAConnection;
+import javax.jms.XASession;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+
+import org.apache.commons.pool.ObjectPoolFactory;
+import org.apache.geronimo.transaction.manager.WrapperNamedXAResource;
+
+public class RecoverableConnectionPool extends XaConnectionPool {
+
+ private String name;
+
+ public RecoverableConnectionPool(XAConnection connection, ObjectPoolFactory poolFactory, TransactionManager transactionManager, String name) throws JMSException {
+ super(connection, poolFactory, transactionManager);
+ this.name = name;
+ }
+
+ protected XAResource createXaResource(PooledSession session) throws JMSException {
+ XAResource xares = ((XASession) session.getInternalSession()).getXAResource();
+ if (name != null) {
+ xares = new WrapperNamedXAResource(xares, name);
+ }
+ return xares;
+ }
+}
Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java (added)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java Tue Jul 31 17:08:04 2012
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.transaction.jms.internal;
+
+/**
+ * A cache key for the session details
+ *
+ *
+ */
+public class SessionKey {
+ private boolean transacted;
+ private int ackMode;
+ private int hash;
+
+ public SessionKey(boolean transacted, int ackMode) {
+ this.transacted = transacted;
+ this.ackMode = ackMode;
+ hash = ackMode;
+ if (transacted) {
+ hash = 31 * hash + 1;
+ }
+ }
+
+ public int hashCode() {
+ return hash;
+ }
+
+ public boolean equals(Object that) {
+ if (this == that) {
+ return true;
+ }
+ if (that instanceof SessionKey) {
+ return equals((SessionKey) that);
+ }
+ return false;
+ }
+
+ public boolean equals(SessionKey that) {
+ return this.transacted == that.transacted && this.ackMode == that.ackMode;
+ }
+
+ public boolean isTransacted() {
+ return transacted;
+ }
+
+ public int getAckMode() {
+ return ackMode;
+ }
+}
Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionPool.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionPool.java?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionPool.java (added)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionPool.java Tue Jul 31 17:08:04 2012
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.transaction.jms.internal;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.XAConnection;
+import javax.jms.XASession;
+
+import org.apache.commons.pool.ObjectPool;
+import org.apache.commons.pool.PoolableObjectFactory;
+
+/**
+ * Represents the session pool for a given JMS connection.
+ *
+ *
+ */
+public class SessionPool implements PoolableObjectFactory {
+ private ConnectionPool connectionPool;
+ private SessionKey key;
+ private ObjectPool sessionPool;
+
+ public SessionPool(ConnectionPool connectionPool, SessionKey key, ObjectPool sessionPool) {
+ this.connectionPool = connectionPool;
+ this.key = key;
+ this.sessionPool = sessionPool;
+ sessionPool.setFactory(this);
+ }
+
+ public void close() throws Exception {
+ if (sessionPool != null) {
+ sessionPool.close();
+ }
+ sessionPool = null;
+ }
+
+ public PooledSession borrowSession() throws JMSException {
+ try {
+ Object object = getSessionPool().borrowObject();
+ return (PooledSession)object;
+ } catch (JMSException e) {
+ throw e;
+ } catch (Exception e) {
+ throw JMSExceptionSupport.create(e);
+ }
+ }
+
+ public void returnSession(PooledSession session) throws JMSException {
+ // lets check if we are already closed
+ getConnection();
+ try {
+ connectionPool.onSessionReturned(session);
+ getSessionPool().returnObject(session);
+ } catch (Exception e) {
+ throw JMSExceptionSupport.create("Failed to return session to pool: " + e, e);
+ }
+ }
+
+ public void invalidateSession(PooledSession session) throws JMSException {
+ try {
+ connectionPool.onSessionInvalidated(session);
+ getSessionPool().invalidateObject(session);
+ } catch (Exception e) {
+ throw JMSExceptionSupport.create("Failed to invalidate session: " + e, e);
+ }
+ }
+
+ // PoolableObjectFactory methods
+ // -------------------------------------------------------------------------
+ public Object makeObject() throws Exception {
+ if (getConnection() instanceof XAConnection) {
+ return new PooledSession(createXaSession(), this, key.isTransacted());
+ } else {
+ return new PooledSession(createSession(), this, key.isTransacted());
+ }
+ }
+
+ public void destroyObject(Object o) throws Exception {
+ PooledSession session = (PooledSession)o;
+ session.getInternalSession().close();
+ }
+
+ public boolean validateObject(Object o) {
+ return true;
+ }
+
+ public void activateObject(Object o) throws Exception {
+ }
+
+ public void passivateObject(Object o) throws Exception {
+ }
+
+ // Implemention methods
+ // -------------------------------------------------------------------------
+ protected ObjectPool getSessionPool() throws JMSException {
+ if (sessionPool == null) {
+ throw new JMSException("Already closed");
+ }
+ return sessionPool;
+ }
+
+ protected Connection getConnection() throws JMSException {
+ return connectionPool.getConnection();
+ }
+
+ protected Session createSession() throws JMSException {
+ return getConnection().createSession(key.isTransacted(), key.getAckMode());
+ }
+
+ protected XASession createXaSession() throws JMSException {
+ return ((XAConnection)getConnection()).createXASession();
+ }
+
+}
Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaConnectionPool.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaConnectionPool.java?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaConnectionPool.java (added)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaConnectionPool.java Tue Jul 31 17:08:04 2012
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.transaction.jms.internal;
+
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.XAConnection;
+import javax.transaction.RollbackException;
+import javax.transaction.Status;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+
+import org.apache.commons.pool.ObjectPoolFactory;
+
+/**
+ * An XA-aware connection pool. When a session is created and an xa transaction is active,
+ * the session will automatically be enlisted in the current transaction.
+ *
+ * @author gnodet
+ */
+public class XaConnectionPool extends ConnectionPool {
+
+ private TransactionManager transactionManager;
+
+ public XaConnectionPool(XAConnection connection, ObjectPoolFactory poolFactory, TransactionManager transactionManager) throws JMSException {
+ super(connection, poolFactory);
+ this.transactionManager = transactionManager;
+ }
+
+ public Session createSession(boolean transacted, int ackMode) throws JMSException {
+ PooledSession session = null;
+ try {
+ boolean isXa = (transactionManager != null && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION);
+ if (isXa) {
+ transacted = true;
+ ackMode = Session.SESSION_TRANSACTED;
+ session = (PooledSession) super.createXaSession(transacted, ackMode);
+ session.setIgnoreClose(true);
+ session.setIsXa(true);
+ transactionManager.getTransaction().registerSynchronization(new Synchronization(session));
+ incrementReferenceCount();
+ transactionManager.getTransaction().enlistResource(createXaResource(session));
+ } else {
+ session = (PooledSession) super.createSession(transacted, ackMode);
+ session.setIgnoreClose(false);
+ }
+ return session;
+ } catch (RollbackException e) {
+ final JMSException jmsException = new JMSException("Rollback Exception");
+ jmsException.initCause(e);
+ throw jmsException;
+ } catch (SystemException e) {
+ final JMSException jmsException = new JMSException("System Exception");
+ jmsException.initCause(e);
+ throw jmsException;
+ }
+ }
+
+ protected XAResource createXaResource(PooledSession session) throws JMSException {
+ return session.getXAResource();
+ }
+
+
+ protected class Synchronization implements javax.transaction.Synchronization {
+ private final PooledSession session;
+
+ private Synchronization(PooledSession session) {
+ this.session = session;
+ }
+
+ public void beforeCompletion() {
+ }
+
+ public void afterCompletion(int status) {
+ try {
+ // This will return session to the pool.
+ session.setIgnoreClose(false);
+ session.close();
+ session.setIgnoreClose(true);
+ session.setIsXa(false);
+ decrementReferenceCount();
+ } catch (JMSException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+}
Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaPooledConnectionFactory.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaPooledConnectionFactory.java?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaPooledConnectionFactory.java (added)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaPooledConnectionFactory.java Tue Jul 31 17:08:04 2012
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.transaction.jms.internal;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.transaction.TransactionManager;
+
+import org.apache.aries.transaction.jms.PooledConnectionFactory;
+
+/**
+ * A pooled connection factory that automatically enlists
+ * sessions in the current active XA transaction if any.
+ */
+public class XaPooledConnectionFactory extends PooledConnectionFactory {
+
+ private XAConnectionFactory xaConnectionFactory;
+ private TransactionManager transactionManager;
+
+ public XaPooledConnectionFactory() {
+ super();
+ }
+
+ public XAConnectionFactory getXaConnectionFactory() {
+ return xaConnectionFactory;
+ }
+
+ public void setXaConnectionFactory(XAConnectionFactory xaConnectionFactory) {
+ this.xaConnectionFactory = xaConnectionFactory;
+ setConnectionFactory(new ConnectionFactory() {
+ public Connection createConnection() throws JMSException {
+ return XaPooledConnectionFactory.this.xaConnectionFactory.createXAConnection();
+ }
+ public Connection createConnection(String userName, String password) throws JMSException {
+ return XaPooledConnectionFactory.this.xaConnectionFactory.createXAConnection(userName, password);
+ }
+ });
+ }
+
+ public TransactionManager getTransactionManager() {
+ return transactionManager;
+ }
+
+ /**
+ * The XA TransactionManager to use to enlist the JMS sessions into.
+ *
+ * @org.apache.xbean.Property required=true
+ */
+ public void setTransactionManager(TransactionManager transactionManager) {
+ this.transactionManager = transactionManager;
+ }
+
+ protected ConnectionPool createConnectionPool(Connection connection) throws JMSException {
+ return new XaConnectionPool((XAConnection) connection, getPoolFactory(), getTransactionManager());
+ }
+}
Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/package.html
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/package.html?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/package.html (added)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/package.html Tue Jul 31 17:08:04 2012
@@ -0,0 +1,26 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+A JMS provider which pools Connection, Session and MessageProducer instances so it can be used with tools like
+Spring's <a href="http://activemq.org/Spring+Support">JmsTemplate</a>.
+
+</body>
+</html>
Propchange: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/package.html
------------------------------------------------------------------------------
svn:executable = *
Added: aries/trunk/transaction/transaction-jms/src/main/resources/OSGI-INF/blueprint/transaction-jms.xml
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/resources/OSGI-INF/blueprint/transaction-jms.xml?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/resources/OSGI-INF/blueprint/transaction-jms.xml (added)
+++ aries/trunk/transaction/transaction-jms/src/main/resources/OSGI-INF/blueprint/transaction-jms.xml Tue Jul 31 17:08:04 2012
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+implied.
+
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0">
+
+ <service interface="org.apache.aries.blueprint.NamespaceHandler">
+ <service-properties>
+ <entry key="osgi.service.blueprint.namespace" value="http://aries.apache.org/xmlns/transaction-jms/1.0"/>
+ </service-properties>
+ <bean class="org.apache.xbean.blueprint.context.impl.XBeanNamespaceHandler">
+ <argument value="http://aries.apache.org/xmlns/transaction-jms/1.0"/>
+ <argument value="org.apache.aries.transaction.jms.xsd"/>
+ <argument ref="blueprintBundle"/>
+ <argument value="META-INF/services/org/apache/xbean/spring/http/aries.apache.org/xmlns/transaction-jms/1.0"/>
+ </bean>
+ </service>
+
+</blueprint>