You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2009/07/31 21:54:36 UTC

svn commit: r799724 - in /cxf/trunk: integration/jca/src/main/java/org/apache/cxf/jca/inbound/ rt/transports/jms/ rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/

Author: dkulp
Date: Fri Jul 31 19:54:36 2009
New Revision: 799724

URL: http://svn.apache.org/viewvc?rev=799724&view=rev
Log:
[CXF-2372] JCA + XA transaction work
Patch from Seumas Soltysik applied (with a lot of mods due to conflicts
on trunk)

Added:
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java   (with props)
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/XASingleConnectionFactory.java   (with props)
Modified:
    cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/DispatchMDBInvoker.java
    cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/MDBActivationWork.java
    cxf/trunk/rt/transports/jms/pom.xml
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java

Modified: cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/DispatchMDBInvoker.java
URL: http://svn.apache.org/viewvc/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/DispatchMDBInvoker.java?rev=799724&r1=799723&r2=799724&view=diff
==============================================================================
--- cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/DispatchMDBInvoker.java (original)
+++ cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/DispatchMDBInvoker.java Fri Jul 31 19:54:36 2009
@@ -53,8 +53,20 @@
     
     @Override
     public Object getServiceObject(Exchange context) {
+        MessageEndpoint ep = null;
+        MessageEndpoint epFromMessage = null;
+        
+        if (context != null) {
+            epFromMessage = context.getInMessage().getContent(MessageEndpoint.class);
+        }
+         
+        if (epFromMessage == null) {
+            ep = getMessageEndpoint();
+        } else {
+            ep = epFromMessage;
+        }
+        
         Object target = null;
-        MessageEndpoint ep = getMessageEndpoint();
 
         if (ep == null) {
             LOG.log(Level.SEVERE, "Failed to obtain MessageEndpoint");
@@ -68,9 +80,11 @@
             LOG.log(Level.SEVERE, "Failed to obtain service object " + targetJndiName, e);
             return null;
         } finally {
-            releaseEndpoint(ep);
+            if (epFromMessage == null) {
+                releaseEndpoint(ep);
+            }
         }
-        
+
         return target;
     }
 

Modified: cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/MDBActivationWork.java
URL: http://svn.apache.org/viewvc/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/MDBActivationWork.java?rev=799724&r1=799723&r2=799724&view=diff
==============================================================================
--- cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/MDBActivationWork.java (original)
+++ cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/MDBActivationWork.java Fri Jul 31 19:54:36 2009
@@ -18,6 +18,7 @@
  */
 package org.apache.cxf.jca.inbound;
 
+import java.lang.reflect.Method;
 import java.net.URL;
 import java.util.Arrays;
 import java.util.List;
@@ -39,6 +40,7 @@
 import org.apache.cxf.jaxws.EndpointImpl;
 import org.apache.cxf.jaxws.EndpointUtils;
 import org.apache.cxf.jaxws.JaxWsServerFactoryBean;
+import org.apache.cxf.service.model.EndpointInfo;
 
 /**
  *
@@ -51,6 +53,9 @@
 public class MDBActivationWork implements Work {
     
     private static final Logger LOG = LogUtils.getL7dLogger(MDBActivationWork.class);
+    private static final String MESSAGE_LISTENER_METHOD = "lookupTargetObject";
+    private static final String MESSAGE_ENDPOINT_FACTORY = "MessageEndpointFactory";
+    private static final String MDB_TRANSACTED_METHOD = "MDBTransactedMethod";
 
     private MDBActivationSpec spec;
     private MessageEndpointFactory endpointFactory;
@@ -126,6 +131,16 @@
         if (bus == null) {
             bus = BusFactory.getDefaultBus();
         }
+        
+        Method method = null;
+
+        try {
+            Class clazz = org.apache.cxf.jca.inbound.DispatchMDBMessageListener.class;
+            method = clazz.getMethod(MESSAGE_LISTENER_METHOD, new Class[] {String.class});
+        } catch (Exception ex) {
+            LOG.severe("Failed to get method " + MESSAGE_LISTENER_METHOD
+                       + " from class DispatchMDBMessageListener.");
+        }
 
         Server server = createServer(bus, serviceClass, invoker);
         
@@ -134,6 +149,10 @@
             return;
         }
         
+        EndpointInfo  ei = server.getEndpoint().getEndpointInfo();
+        ei.setProperty(MESSAGE_ENDPOINT_FACTORY, endpointFactory);
+        ei.setProperty(MDB_TRANSACTED_METHOD, method);
+
         server.start();
         
         // save the server for clean up later

Modified: cxf/trunk/rt/transports/jms/pom.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/pom.xml?rev=799724&r1=799723&r2=799724&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/pom.xml (original)
+++ cxf/trunk/rt/transports/jms/pom.xml Fri Jul 31 19:54:36 2009
@@ -112,6 +112,11 @@
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.geronimo.specs</groupId>
+            <artifactId>geronimo-j2ee-connector_1.5_spec</artifactId>
+            <scope>provided</scope>
+        </dependency>        
     </dependencies>
 
     <build>

Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java?rev=799724&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java Fri Jul 31 19:54:36 2009
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.jms;
+
+import java.lang.reflect.Method;
+
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.XASession;
+import javax.resource.spi.endpoint.MessageEndpoint;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.transaction.xa.XAResource;
+
+import org.apache.cxf.service.model.EndpointInfo;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.jms.support.JmsUtils;
+
+public class JCATransactionalMessageListenerContainer extends DefaultMessageListenerContainer {
+    static final ThreadLocal<MessageEndpoint> ENDPOINT_LOCAL = new ThreadLocal<MessageEndpoint>();
+    static final String MESSAGE_ENDPOINT_FACTORY = "MessageEndpointFactory";
+    static final String MDB_TRANSACTED_METHOD = "MDBTransactedMethod";
+    private MessageEndpointFactory factory;
+    private Method method;
+    
+    public JCATransactionalMessageListenerContainer(EndpointInfo ei) {
+        factory = ei.getProperty(MESSAGE_ENDPOINT_FACTORY, 
+                                 MessageEndpointFactory.class);
+        method = ei.getProperty(MDB_TRANSACTED_METHOD, Method.class);
+        this.setCacheLevel(CACHE_CONNECTION);
+    }
+    
+    protected boolean receiveAndExecute(Object invoker, Session session, MessageConsumer consumer)
+        throws JMSException {
+        boolean messageReceived = false;
+        MessageEndpoint ep = null;
+        MessageConsumer mc = null;
+        XASession xa = null;
+        Session s = null;
+
+        try {        
+            xa = (XASession)createSession(getSharedConnection());
+            XAResource xar = xa.getXAResource();
+            s = xa.getSession();
+            mc = s.createConsumer(getDestination());            
+            ep = factory.createEndpoint(xar);
+            ENDPOINT_LOCAL.set(ep);
+            ep.beforeDelivery(method);                
+            messageReceived = doReceiveAndExecute(invoker, s, mc, null);
+            ep.afterDelivery();
+        } catch (Exception ex) {
+            throw new JMSException(ex.getMessage());
+        } finally {
+            ep.release();
+            JmsUtils.closeMessageConsumer(mc);
+            JmsUtils.closeSession(xa);
+            JmsUtils.closeSession(s);
+        }
+
+        return messageReceived;
+    }
+    
+}

Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java?rev=799724&r1=799723&r2=799724&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java Fri Jul 31 19:54:36 2009
@@ -20,6 +20,7 @@
 
 import javax.jms.ConnectionFactory;
 import javax.jms.Message;
+import javax.jms.XAConnectionFactory;
 
 import org.apache.cxf.configuration.ConfigurationException;
 import org.springframework.beans.factory.InitializingBean;
@@ -419,7 +420,11 @@
             if (wrapInSingleConnectionFactory && !(connectionFactory instanceof SingleConnectionFactory)) {
                 SingleConnectionFactory scf;
                 if (useJms11) {
-                    scf = new SingleConnectionFactory(connectionFactory);
+                    if (connectionFactory instanceof XAConnectionFactory) {
+                        scf = new XASingleConnectionFactory(connectionFactory);
+                    } else {
+                        scf = new SingleConnectionFactory(connectionFactory);
+                    }
                 } else {
                     scf = new SingleConnectionFactory102(connectionFactory, pubSubDomain);
                 }

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=799724&r1=799723&r2=799724&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Fri Jul 31 19:54:36 2009
@@ -39,6 +39,7 @@
 import javax.jms.MessageListener;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.resource.spi.endpoint.MessageEndpoint;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.BusFactory;
@@ -73,6 +74,7 @@
 
     private JMSConfiguration jmsConfig;
     private Bus bus;
+    private EndpointInfo ei;
     private DefaultMessageListenerContainer jmsListener;
     private Collection<JMSContinuation> continuations = 
         new ConcurrentLinkedQueue<JMSContinuation>();
@@ -80,6 +82,7 @@
     public JMSDestination(Bus b, EndpointInfo info, JMSConfiguration jmsConfig) {
         super(b, getTargetReference(info, b), info);
         this.bus = b;
+        this.ei = info;
         this.jmsConfig = jmsConfig;
         info.setProperty(OneWayProcessorInterceptor.USE_ORIGINAL_THREAD, Boolean.TRUE);
     }
@@ -102,8 +105,8 @@
         org.apache.cxf.common.i18n.Message msg = 
             new org.apache.cxf.common.i18n.Message("INSUFFICIENT_CONFIGURATION_DESTINATION", LOG, name);
         jmsConfig.ensureProperlyConfigured(msg);
-        jmsListener = JMSFactory.createJmsListener(jmsConfig, this, 
-                                                   jmsConfig.getTargetDestination(), null);
+        jmsListener = JMSFactory.createJmsListener(ei, jmsConfig, this, 
+                                                   jmsConfig.getTargetDestination());
     }
 
     public void deactivate() {
@@ -190,6 +193,12 @@
             
             BusFactory.setThreadDefaultBus(bus);
 
+            MessageEndpoint ep = JCATransactionalMessageListenerContainer.ENDPOINT_LOCAL.get();
+            if (ep != null) {
+                inMessage.setContent(MessageEndpoint.class, ep);
+                JCATransactionalMessageListenerContainer.ENDPOINT_LOCAL.remove();
+            }
+            
             // handle the incoming message
             incomingObserver.onMessage(inMessage);
         } catch (SuspendedInvocationException ex) {

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java?rev=799724&r1=799723&r2=799724&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java Fri Jul 31 19:54:36 2009
@@ -18,6 +18,7 @@
  */
 package org.apache.cxf.transport.jms;
 
+import java.lang.reflect.Method;
 import java.util.logging.Logger;
 
 import javax.jms.ConnectionFactory;
@@ -26,9 +27,11 @@
 import javax.jms.MessageListener;
 import javax.jms.QueueSession;
 import javax.jms.Session;
+import javax.jms.XAConnectionFactory;
 import javax.naming.NamingException;
 
 import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.service.model.EndpointInfo;
 import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.core.JmsTemplate102;
@@ -112,23 +115,49 @@
         }
         return jmsTemplate;
     }
-
     /**
      * Create and start listener using configuration information from jmsConfig. Uses
      * resolveOrCreateDestination to determine the destination for the listener.
      * 
+     * @param ei the EndpointInfo for the listener
      * @param jmsConfig configuration information
      * @param listenerHandler object to be called when a message arrives
      * @param destinationName null for temp dest or a destination name
-     * @param conduitId prefix for the messageselector
      * @return
      */
-    public static DefaultMessageListenerContainer createJmsListener(JMSConfiguration jmsConfig,
+    public static DefaultMessageListenerContainer createJmsListener(EndpointInfo ei,
+                                                                    JMSConfiguration jmsConfig,
                                                                     MessageListener listenerHandler,
-                                                                    String destinationName, 
-                                                                    String conduitId) {
-        DefaultMessageListenerContainer jmsListener = jmsConfig.isUseJms11()
-            ? new DefaultMessageListenerContainer() : new DefaultMessageListenerContainer102();
+                                                                    String destinationName) {
+        DefaultMessageListenerContainer jmsListener = null;
+        
+        if (jmsConfig.isUseJms11()) {
+            //Check to see if transport is being used in JCA RA with XA
+            Method method = ei.getProperty(JCATransactionalMessageListenerContainer.MDB_TRANSACTED_METHOD,
+                                           java.lang.reflect.Method.class);
+            if (method != null 
+                && 
+                jmsConfig.getConnectionFactory() instanceof XAConnectionFactory) {
+                jmsListener = new JCATransactionalMessageListenerContainer(ei); 
+            } else {
+                jmsListener = new DefaultMessageListenerContainer();
+            }
+        } else {
+            jmsListener = new DefaultMessageListenerContainer102();
+        }
+        
+        return createJmsListener(jmsListener,
+                                 jmsConfig,
+                                 listenerHandler,
+                                 destinationName);            
+    }
+    
+    public static DefaultMessageListenerContainer createJmsListener(
+                          DefaultMessageListenerContainer jmsListener,
+                          JMSConfiguration jmsConfig,
+                          MessageListener listenerHandler,
+                          String destinationName) {
+        
         jmsListener.setConcurrentConsumers(jmsConfig.getConcurrentConsumers());
         jmsListener.setMaxConcurrentConsumers(jmsConfig.getMaxConcurrentConsumers());
         jmsListener.setPubSubDomain(jmsConfig.isPubSubDomain());
@@ -161,15 +190,11 @@
         if (jmsConfig.isAcceptMessagesWhileStopping()) {
             jmsListener.setAcceptMessagesWhileStopping(jmsConfig.isAcceptMessagesWhileStopping());
         }
-        /*String staticSelectorPrefix = jmsConfig.getConduitSelectorPrefix();
-        if (conduitId != null && jmsConfig.isUseConduitIdSelector()) {
-            jmsListener.setMessageSelector("JMSCorrelationID LIKE '" 
-                                        + staticSelectorPrefix 
-                                        + conduitId + "%'");
-        } else if (staticSelectorPrefix.length() > 0) {
+        String staticSelectorPrefix = jmsConfig.getConduitSelectorPrefix();
+        if (staticSelectorPrefix.length() > 0) {
             jmsListener.setMessageSelector("JMSCorrelationID LIKE '" 
                                         + staticSelectorPrefix +  "%'");
-        }*/
+        }
         if (jmsConfig.getDestinationResolver() != null) {
             jmsListener.setDestinationResolver(jmsConfig.getDestinationResolver());
         }

Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/XASingleConnectionFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/XASingleConnectionFactory.java?rev=799724&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/XASingleConnectionFactory.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/XASingleConnectionFactory.java Fri Jul 31 19:54:36 2009
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.jms;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+
+import org.springframework.jms.connection.SingleConnectionFactory;
+
+public class XASingleConnectionFactory extends SingleConnectionFactory {
+
+    public XASingleConnectionFactory(ConnectionFactory targetConnectionFactory) {
+        super(targetConnectionFactory);
+    }
+
+    protected Connection doCreateConnection() throws JMSException {
+        XAConnectionFactory xcf = (XAConnectionFactory)getTargetConnectionFactory();
+        return xcf.createXAConnection();
+    }
+    
+    protected Session getSession(Connection con, Integer mode) throws JMSException {
+        XAConnection xac = (XAConnection)con;
+        return xac.createXASession();
+    }
+    
+}

Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/XASingleConnectionFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/XASingleConnectionFactory.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date