You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by gn...@apache.org on 2012/07/31 19:08:05 UTC

svn commit: r1367676 [2/2] - in /aries/trunk/transaction: ./ transaction-jms/ transaction-jms/src/ transaction-jms/src/main/ transaction-jms/src/main/java/ transaction-jms/src/main/java/org/ transaction-jms/src/main/java/org/apache/ transaction-jms/src...

Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledTopicPublisher.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledTopicPublisher.java?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledTopicPublisher.java (added)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledTopicPublisher.java Tue Jul 31 17:08:04 2012
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.transaction.jms.internal;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+
+/**
+ * 
+ */
+public class PooledTopicPublisher extends PooledProducer implements TopicPublisher {
+
+    public PooledTopicPublisher(TopicPublisher messageProducer, Destination destination) throws JMSException {
+        super(messageProducer, destination);
+    }
+
+    public Topic getTopic() throws JMSException {
+        return getTopicPublisher().getTopic();
+    }
+
+    public void publish(Message message) throws JMSException {
+        getTopicPublisher().publish((Topic) getDestination(), message);
+    }
+
+    public void publish(Message message, int i, int i1, long l) throws JMSException {
+        getTopicPublisher().publish((Topic) getDestination(), message, i, i1, l);
+    }
+
+    public void publish(Topic topic, Message message) throws JMSException {
+        getTopicPublisher().publish(topic, message);
+    }
+
+    public void publish(Topic topic, Message message, int i, int i1, long l) throws JMSException {
+        getTopicPublisher().publish(topic, message, i, i1, l);
+    }
+
+    protected TopicPublisher getTopicPublisher() {
+        return (TopicPublisher) getMessageProducer();
+    }
+}

Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java (added)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java Tue Jul 31 17:08:04 2012
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.transaction.jms.internal;
+
+import javax.jms.JMSException;
+import javax.jms.XAConnection;
+import javax.jms.XASession;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+
+import org.apache.commons.pool.ObjectPoolFactory;
+import org.apache.geronimo.transaction.manager.WrapperNamedXAResource;
+
+public class RecoverableConnectionPool extends XaConnectionPool {
+
+    private String name;
+
+    public RecoverableConnectionPool(XAConnection connection, ObjectPoolFactory poolFactory, TransactionManager transactionManager, String name) throws JMSException {
+        super(connection, poolFactory, transactionManager);
+        this.name = name;
+    }
+
+    protected XAResource createXaResource(PooledSession session) throws JMSException {
+        XAResource xares = ((XASession) session.getInternalSession()).getXAResource();
+        if (name != null) {
+            xares = new WrapperNamedXAResource(xares, name);
+        }
+        return xares;
+    }
+}

Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java (added)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java Tue Jul 31 17:08:04 2012
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.transaction.jms.internal;
+
+/**
+ * A cache key for the session details
+ *
+ * 
+ */
+public class SessionKey {
+    private boolean transacted;
+    private int ackMode;
+    private int hash;
+
+    public SessionKey(boolean transacted, int ackMode) {
+        this.transacted = transacted;
+        this.ackMode = ackMode;
+        hash = ackMode;
+        if (transacted) {
+            hash = 31 * hash + 1;
+        }
+    }
+
+    public int hashCode() {
+        return hash;
+    }
+
+    public boolean equals(Object that) {
+        if (this == that) {
+            return true;
+        }
+        if (that instanceof SessionKey) {
+            return equals((SessionKey) that);
+        }
+        return false;
+    }
+
+    public boolean equals(SessionKey that) {
+        return this.transacted == that.transacted && this.ackMode == that.ackMode;
+    }
+
+    public boolean isTransacted() {
+        return transacted;
+    }
+
+    public int getAckMode() {
+        return ackMode;
+    }
+}

Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionPool.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionPool.java?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionPool.java (added)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionPool.java Tue Jul 31 17:08:04 2012
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.transaction.jms.internal;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.XAConnection;
+import javax.jms.XASession;
+
+import org.apache.commons.pool.ObjectPool;
+import org.apache.commons.pool.PoolableObjectFactory;
+
+/**
+ * Represents the session pool for a given JMS connection.
+ *
+ *
+ */
+public class SessionPool implements PoolableObjectFactory {
+    private ConnectionPool connectionPool;
+    private SessionKey key;
+    private ObjectPool sessionPool;
+
+    public SessionPool(ConnectionPool connectionPool, SessionKey key, ObjectPool sessionPool) {
+        this.connectionPool = connectionPool;
+        this.key = key;
+        this.sessionPool = sessionPool;
+        sessionPool.setFactory(this);
+    }
+
+    public void close() throws Exception {
+        if (sessionPool != null) {
+            sessionPool.close();
+        }
+        sessionPool = null;
+    }
+
+    public PooledSession borrowSession() throws JMSException {
+        try {
+            Object object = getSessionPool().borrowObject();
+            return (PooledSession)object;
+        } catch (JMSException e) {
+            throw e;
+        } catch (Exception e) {
+            throw JMSExceptionSupport.create(e);
+        }
+    }
+
+    public void returnSession(PooledSession session) throws JMSException {
+        // lets check if we are already closed
+        getConnection();
+        try {
+            connectionPool.onSessionReturned(session);
+            getSessionPool().returnObject(session);
+        } catch (Exception e) {
+            throw JMSExceptionSupport.create("Failed to return session to pool: " + e, e);
+        }
+    }
+
+    public void invalidateSession(PooledSession session) throws JMSException {
+        try {
+            connectionPool.onSessionInvalidated(session);
+            getSessionPool().invalidateObject(session);
+        } catch (Exception e) {
+            throw JMSExceptionSupport.create("Failed to invalidate session: " + e, e);
+        }
+    }
+
+    // PoolableObjectFactory methods
+    // -------------------------------------------------------------------------
+    public Object makeObject() throws Exception {
+    	if (getConnection() instanceof XAConnection) {
+    		return new PooledSession(createXaSession(), this, key.isTransacted());
+    	} else {
+    		return new PooledSession(createSession(), this, key.isTransacted());
+    	}
+    }
+
+    public void destroyObject(Object o) throws Exception {
+        PooledSession session = (PooledSession)o;
+        session.getInternalSession().close();
+    }
+
+    public boolean validateObject(Object o) {
+        return true;
+    }
+
+    public void activateObject(Object o) throws Exception {
+    }
+
+    public void passivateObject(Object o) throws Exception {
+    }
+
+    // Implemention methods
+    // -------------------------------------------------------------------------
+    protected ObjectPool getSessionPool() throws JMSException {
+        if (sessionPool == null) {
+            throw new JMSException("Already closed");
+        }
+        return sessionPool;
+    }
+
+    protected Connection getConnection() throws JMSException {
+        return connectionPool.getConnection();
+    }
+
+    protected Session createSession() throws JMSException {
+        return getConnection().createSession(key.isTransacted(), key.getAckMode());
+    }
+    
+    protected XASession createXaSession() throws JMSException {
+        return ((XAConnection)getConnection()).createXASession();
+    }
+
+}

Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaConnectionPool.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaConnectionPool.java?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaConnectionPool.java (added)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaConnectionPool.java Tue Jul 31 17:08:04 2012
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.transaction.jms.internal;
+
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.XAConnection;
+import javax.transaction.RollbackException;
+import javax.transaction.Status;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+
+import org.apache.commons.pool.ObjectPoolFactory;
+
+/**
+ * An XA-aware connection pool.  When a session is created and an xa transaction is active,
+ * the session will automatically be enlisted in the current transaction.
+ * 
+ * @author gnodet
+ */
+public class XaConnectionPool extends ConnectionPool {
+
+    private TransactionManager transactionManager;
+
+    public XaConnectionPool(XAConnection connection, ObjectPoolFactory poolFactory, TransactionManager transactionManager) throws JMSException {
+        super(connection, poolFactory);
+        this.transactionManager = transactionManager;
+    }
+
+    public Session createSession(boolean transacted, int ackMode) throws JMSException {
+    	PooledSession session = null;
+        try {
+            boolean isXa = (transactionManager != null && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION);
+            if (isXa) {
+                transacted = true;
+                ackMode = Session.SESSION_TRANSACTED;
+                session = (PooledSession) super.createXaSession(transacted, ackMode);
+                session.setIgnoreClose(true);
+                session.setIsXa(true);
+                transactionManager.getTransaction().registerSynchronization(new Synchronization(session));
+                incrementReferenceCount();
+                transactionManager.getTransaction().enlistResource(createXaResource(session));
+            } else {
+            	session = (PooledSession) super.createSession(transacted, ackMode);
+                session.setIgnoreClose(false);
+            }
+            return session;
+        } catch (RollbackException e) {
+            final JMSException jmsException = new JMSException("Rollback Exception");
+            jmsException.initCause(e);
+            throw jmsException;
+        } catch (SystemException e) {
+            final JMSException jmsException = new JMSException("System Exception");
+            jmsException.initCause(e);
+            throw jmsException;
+        }
+    }
+
+    protected XAResource createXaResource(PooledSession session) throws JMSException {
+        return session.getXAResource();
+    }
+    
+    
+    protected class Synchronization implements javax.transaction.Synchronization {
+        private final PooledSession session;
+
+        private Synchronization(PooledSession session) {
+            this.session = session;
+        }
+
+        public void beforeCompletion() {
+        }
+        
+        public void afterCompletion(int status) {
+            try {
+                // This will return session to the pool.
+                session.setIgnoreClose(false);
+                session.close();
+                session.setIgnoreClose(true);
+                session.setIsXa(false);
+                decrementReferenceCount();
+            } catch (JMSException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+    
+}

Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaPooledConnectionFactory.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaPooledConnectionFactory.java?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaPooledConnectionFactory.java (added)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaPooledConnectionFactory.java Tue Jul 31 17:08:04 2012
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.transaction.jms.internal;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.transaction.TransactionManager;
+
+import org.apache.aries.transaction.jms.PooledConnectionFactory;
+
+/**
+ * A pooled connection factory that automatically enlists
+ * sessions in the current active XA transaction if any.
+ */
+public class XaPooledConnectionFactory extends PooledConnectionFactory {
+
+    private XAConnectionFactory xaConnectionFactory;
+    private TransactionManager transactionManager;
+    
+    public XaPooledConnectionFactory() {
+        super();
+    }
+
+    public XAConnectionFactory getXaConnectionFactory() {
+        return xaConnectionFactory;
+    }
+
+    public void setXaConnectionFactory(XAConnectionFactory xaConnectionFactory) {
+    	this.xaConnectionFactory = xaConnectionFactory;
+        setConnectionFactory(new ConnectionFactory() {
+            public Connection createConnection() throws JMSException {
+                return XaPooledConnectionFactory.this.xaConnectionFactory.createXAConnection();
+            }
+            public Connection createConnection(String userName, String password) throws JMSException {
+                return XaPooledConnectionFactory.this.xaConnectionFactory.createXAConnection(userName, password);
+            }
+        });
+    }
+
+    public TransactionManager getTransactionManager() {
+        return transactionManager;
+    }
+
+    /**
+     * The XA TransactionManager to use to enlist the JMS sessions into.
+     *
+     * @org.apache.xbean.Property required=true
+     */
+    public void setTransactionManager(TransactionManager transactionManager) {
+        this.transactionManager = transactionManager;
+    }
+
+    protected ConnectionPool createConnectionPool(Connection connection) throws JMSException {
+        return new XaConnectionPool((XAConnection) connection, getPoolFactory(), getTransactionManager());
+    }
+}

Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/package.html
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/package.html?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/package.html (added)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/package.html Tue Jul 31 17:08:04 2012
@@ -0,0 +1,26 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+A JMS provider which pools Connection, Session and MessageProducer instances so it can be used with tools like 
+Spring's <a href="http://activemq.org/Spring+Support">JmsTemplate</a>.
+
+</body>
+</html>

Propchange: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/package.html
------------------------------------------------------------------------------
    svn:executable = *

Added: aries/trunk/transaction/transaction-jms/src/main/resources/OSGI-INF/blueprint/transaction-jms.xml
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/resources/OSGI-INF/blueprint/transaction-jms.xml?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/resources/OSGI-INF/blueprint/transaction-jms.xml (added)
+++ aries/trunk/transaction/transaction-jms/src/main/resources/OSGI-INF/blueprint/transaction-jms.xml Tue Jul 31 17:08:04 2012
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed  under the  License is distributed on an "AS IS" BASIS,
+WITHOUT  WARRANTIES OR CONDITIONS  OF ANY KIND, either  express  or
+implied.
+
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0">
+
+    <service interface="org.apache.aries.blueprint.NamespaceHandler">
+        <service-properties>
+            <entry key="osgi.service.blueprint.namespace" value="http://aries.apache.org/xmlns/transaction-jms/1.0"/>
+        </service-properties>
+        <bean class="org.apache.xbean.blueprint.context.impl.XBeanNamespaceHandler">
+            <argument value="http://aries.apache.org/xmlns/transaction-jms/1.0"/>
+            <argument value="org.apache.aries.transaction.jms.xsd"/>
+            <argument ref="blueprintBundle"/>
+            <argument value="META-INF/services/org/apache/xbean/spring/http/aries.apache.org/xmlns/transaction-jms/1.0"/>
+        </bean>
+    </service>
+
+</blueprint>