You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2009/07/31 22:25:44 UTC

svn commit: r799740 - in /cxf/branches/2.2.x-fixes: ./ integration/jca/src/main/java/org/apache/cxf/jca/inbound/ rt/transports/jms/ rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/

Author: dkulp
Date: Fri Jul 31 20:25:43 2009
New Revision: 799740

URL: http://svn.apache.org/viewvc?rev=799740&view=rev
Log:
Merged revisions 799724 via svnmerge from 
https://svn.apache.org/repos/asf/cxf/trunk

........
  r799724 | dkulp | 2009-07-31 15:54:36 -0400 (Fri, 31 Jul 2009) | 3 lines
  
  [CXF-2372] JCA + XA transaction work
  Patch from Seumas Soltysik applied (with a lot of mods due to conflicts
  on trunk)
........

Added:
    cxf/branches/2.2.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java
      - copied unchanged from r799724, cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java
    cxf/branches/2.2.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/XASingleConnectionFactory.java
      - copied unchanged from r799724, cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/XASingleConnectionFactory.java
Modified:
    cxf/branches/2.2.x-fixes/   (props changed)
    cxf/branches/2.2.x-fixes/integration/jca/src/main/java/org/apache/cxf/jca/inbound/DispatchMDBInvoker.java
    cxf/branches/2.2.x-fixes/integration/jca/src/main/java/org/apache/cxf/jca/inbound/MDBActivationWork.java
    cxf/branches/2.2.x-fixes/rt/transports/jms/pom.xml
    cxf/branches/2.2.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
    cxf/branches/2.2.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
    cxf/branches/2.2.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java

Propchange: cxf/branches/2.2.x-fixes/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jul 31 20:25:43 2009
@@ -1 +1 @@
-/cxf/trunk:782728-782730,783097,783294,783396,784059,784181-784184,784893,784895,785279-785282,785468,785621,785624,785651,785734,785866,786142,786271-786272,786395,786512,786514,786582-786583,786638,786647,786850,787200,787269,787277-787279,787290-787291,787305,787323,787366,787849,788030,788060,788187,788444,788451,788703,788752,788774,788819-788820,789013,789371,789387,789420,789527-789530,789704-789705,789788,789811,789896-789901,790074,790094,790134,790188,790294,790553,790637-790644,790868,791301,791354,791538,791753,791947,792007,792096,792183,792261-792265,792271,792604,792683-792685,792975,792985,793059,793570,794297,794396,794680,794728,794771,794778-794780,794892,795044,795104,795160,795583,795907,796022-796023,796352,796593,796741,796780,796994-796997,797117,797159,797192,797194,797231-797233,797442,797505,797517,797534,797581-797583,797587,797640,797651,797699,797882-797883,798344-798346,798363,798461,798479,798533,798551,798557,798561-798562,798570,798573,79858
 4,798654,798748-798749,798891,798929-798930,799267,799439,799448,799637,799723
+/cxf/trunk:782728-782730,783097,783294,783396,784059,784181-784184,784893,784895,785279-785282,785468,785621,785624,785651,785734,785866,786142,786271-786272,786395,786512,786514,786582-786583,786638,786647,786850,787200,787269,787277-787279,787290-787291,787305,787323,787366,787849,788030,788060,788187,788444,788451,788703,788752,788774,788819-788820,789013,789371,789387,789420,789527-789530,789704-789705,789788,789811,789896-789901,790074,790094,790134,790188,790294,790553,790637-790644,790868,791301,791354,791538,791753,791947,792007,792096,792183,792261-792265,792271,792604,792683-792685,792975,792985,793059,793570,794297,794396,794680,794728,794771,794778-794780,794892,795044,795104,795160,795583,795907,796022-796023,796352,796593,796741,796780,796994-796997,797117,797159,797192,797194,797231-797233,797442,797505,797517,797534,797581-797583,797587,797640,797651,797699,797882-797883,798344-798346,798363,798461,798479,798533,798551,798557,798561-798562,798570,798573,79858
 4,798654,798748-798749,798891,798929-798930,799267,799439,799448,799637,799723-799724

Propchange: cxf/branches/2.2.x-fixes/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: cxf/branches/2.2.x-fixes/integration/jca/src/main/java/org/apache/cxf/jca/inbound/DispatchMDBInvoker.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.2.x-fixes/integration/jca/src/main/java/org/apache/cxf/jca/inbound/DispatchMDBInvoker.java?rev=799740&r1=799739&r2=799740&view=diff
==============================================================================
--- cxf/branches/2.2.x-fixes/integration/jca/src/main/java/org/apache/cxf/jca/inbound/DispatchMDBInvoker.java (original)
+++ cxf/branches/2.2.x-fixes/integration/jca/src/main/java/org/apache/cxf/jca/inbound/DispatchMDBInvoker.java Fri Jul 31 20:25:43 2009
@@ -53,8 +53,20 @@
     
     @Override
     public Object getServiceObject(Exchange context) {
+        MessageEndpoint ep = null;
+        MessageEndpoint epFromMessage = null;
+        
+        if (context != null) {
+            epFromMessage = context.getInMessage().getContent(MessageEndpoint.class);
+        }
+         
+        if (epFromMessage == null) {
+            ep = getMessageEndpoint();
+        } else {
+            ep = epFromMessage;
+        }
+        
         Object target = null;
-        MessageEndpoint ep = getMessageEndpoint();
 
         if (ep == null) {
             LOG.log(Level.SEVERE, "Failed to obtain MessageEndpoint");
@@ -68,9 +80,11 @@
             LOG.log(Level.SEVERE, "Failed to obtain service object " + targetJndiName, e);
             return null;
         } finally {
-            releaseEndpoint(ep);
+            if (epFromMessage == null) {
+                releaseEndpoint(ep);
+            }
         }
-        
+
         return target;
     }
 

Modified: cxf/branches/2.2.x-fixes/integration/jca/src/main/java/org/apache/cxf/jca/inbound/MDBActivationWork.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.2.x-fixes/integration/jca/src/main/java/org/apache/cxf/jca/inbound/MDBActivationWork.java?rev=799740&r1=799739&r2=799740&view=diff
==============================================================================
--- cxf/branches/2.2.x-fixes/integration/jca/src/main/java/org/apache/cxf/jca/inbound/MDBActivationWork.java (original)
+++ cxf/branches/2.2.x-fixes/integration/jca/src/main/java/org/apache/cxf/jca/inbound/MDBActivationWork.java Fri Jul 31 20:25:43 2009
@@ -18,6 +18,7 @@
  */
 package org.apache.cxf.jca.inbound;
 
+import java.lang.reflect.Method;
 import java.net.URL;
 import java.util.Arrays;
 import java.util.List;
@@ -39,6 +40,7 @@
 import org.apache.cxf.jaxws.EndpointImpl;
 import org.apache.cxf.jaxws.EndpointUtils;
 import org.apache.cxf.jaxws.JaxWsServerFactoryBean;
+import org.apache.cxf.service.model.EndpointInfo;
 
 /**
  *
@@ -51,6 +53,9 @@
 public class MDBActivationWork implements Work {
     
     private static final Logger LOG = LogUtils.getL7dLogger(MDBActivationWork.class);
+    private static final String MESSAGE_LISTENER_METHOD = "lookupTargetObject";
+    private static final String MESSAGE_ENDPOINT_FACTORY = "MessageEndpointFactory";
+    private static final String MDB_TRANSACTED_METHOD = "MDBTransactedMethod";
 
     private MDBActivationSpec spec;
     private MessageEndpointFactory endpointFactory;
@@ -126,6 +131,16 @@
         if (bus == null) {
             bus = BusFactory.getDefaultBus();
         }
+        
+        Method method = null;
+
+        try {
+            Class clazz = org.apache.cxf.jca.inbound.DispatchMDBMessageListener.class;
+            method = clazz.getMethod(MESSAGE_LISTENER_METHOD, new Class[] {String.class});
+        } catch (Exception ex) {
+            LOG.severe("Failed to get method " + MESSAGE_LISTENER_METHOD
+                       + " from class DispatchMDBMessageListener.");
+        }
 
         Server server = createServer(bus, serviceClass, invoker);
         
@@ -134,6 +149,10 @@
             return;
         }
         
+        EndpointInfo  ei = server.getEndpoint().getEndpointInfo();
+        ei.setProperty(MESSAGE_ENDPOINT_FACTORY, endpointFactory);
+        ei.setProperty(MDB_TRANSACTED_METHOD, method);
+
         server.start();
         
         // save the server for clean up later

Modified: cxf/branches/2.2.x-fixes/rt/transports/jms/pom.xml
URL: http://svn.apache.org/viewvc/cxf/branches/2.2.x-fixes/rt/transports/jms/pom.xml?rev=799740&r1=799739&r2=799740&view=diff
==============================================================================
--- cxf/branches/2.2.x-fixes/rt/transports/jms/pom.xml (original)
+++ cxf/branches/2.2.x-fixes/rt/transports/jms/pom.xml Fri Jul 31 20:25:43 2009
@@ -91,6 +91,11 @@
         	<version>${spring.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.geronimo.specs</groupId>
+            <artifactId>geronimo-j2ee-connector_1.5_spec</artifactId>
+            <scope>provided</scope>
+        </dependency>        
     </dependencies>
 
     <build>

Modified: cxf/branches/2.2.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.2.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java?rev=799740&r1=799739&r2=799740&view=diff
==============================================================================
--- cxf/branches/2.2.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java (original)
+++ cxf/branches/2.2.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java Fri Jul 31 20:25:43 2009
@@ -20,6 +20,7 @@
 
 import javax.jms.ConnectionFactory;
 import javax.jms.Message;
+import javax.jms.XAConnectionFactory;
 
 import org.apache.cxf.configuration.ConfigurationException;
 import org.springframework.beans.factory.InitializingBean;
@@ -407,7 +408,11 @@
             if (wrapInSingleConnectionFactory && !(connectionFactory instanceof SingleConnectionFactory)) {
                 SingleConnectionFactory scf;
                 if (useJms11) {
-                    scf = new SingleConnectionFactory(connectionFactory);
+                    if (connectionFactory instanceof XAConnectionFactory) {
+                        scf = new XASingleConnectionFactory(connectionFactory);
+                    } else {
+                        scf = new SingleConnectionFactory(connectionFactory);
+                    }
                 } else {
                     scf = new SingleConnectionFactory102(connectionFactory, pubSubDomain);
                 }

Modified: cxf/branches/2.2.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.2.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=799740&r1=799739&r2=799740&view=diff
==============================================================================
--- cxf/branches/2.2.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original)
+++ cxf/branches/2.2.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Fri Jul 31 20:25:43 2009
@@ -41,6 +41,7 @@
 import javax.jms.MessageListener;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.resource.spi.endpoint.MessageEndpoint;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.BusFactory;
@@ -76,6 +77,7 @@
 
     private JMSConfiguration jmsConfig;
     private Bus bus;
+    private EndpointInfo ei;
     private DefaultMessageListenerContainer jmsListener;
     private Collection<JMSContinuation> continuations = 
         new ConcurrentLinkedQueue<JMSContinuation>();
@@ -83,6 +85,7 @@
     public JMSDestination(Bus b, EndpointInfo info, JMSConfiguration jmsConfig) {
         super(b, getTargetReference(info, b), info);
         this.bus = b;
+        this.ei = info;
         this.jmsConfig = jmsConfig;
         info.setProperty(OneWayProcessorInterceptor.USE_ORIGINAL_THREAD, Boolean.TRUE);
     }
@@ -105,7 +108,7 @@
         org.apache.cxf.common.i18n.Message msg = 
             new org.apache.cxf.common.i18n.Message("INSUFFICIENT_CONFIGURATION_DESTINATION", LOG, name);
         jmsConfig.ensureProperlyConfigured(msg);
-        jmsListener = JMSFactory.createJmsListener(jmsConfig, this, 
+        jmsListener = JMSFactory.createJmsListener(ei, jmsConfig, this, 
                                                    jmsConfig.getTargetDestination(), null, false);
     }
 
@@ -193,6 +196,12 @@
             
             BusFactory.setThreadDefaultBus(bus);
 
+            MessageEndpoint ep = JCATransactionalMessageListenerContainer.ENDPOINT_LOCAL.get();
+            if (ep != null) {
+                inMessage.setContent(MessageEndpoint.class, ep);
+                JCATransactionalMessageListenerContainer.ENDPOINT_LOCAL.remove();
+            }
+            
             // handle the incoming message
             incomingObserver.onMessage(inMessage);
         } catch (SuspendedInvocationException ex) {

Modified: cxf/branches/2.2.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.2.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java?rev=799740&r1=799739&r2=799740&view=diff
==============================================================================
--- cxf/branches/2.2.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java (original)
+++ cxf/branches/2.2.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java Fri Jul 31 20:25:43 2009
@@ -18,6 +18,7 @@
  */
 package org.apache.cxf.transport.jms;
 
+import java.lang.reflect.Method;
 import java.util.logging.Logger;
 
 import javax.jms.ConnectionFactory;
@@ -26,9 +27,11 @@
 import javax.jms.MessageListener;
 import javax.jms.QueueSession;
 import javax.jms.Session;
+import javax.jms.XAConnectionFactory;
 import javax.naming.NamingException;
 
 import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.service.model.EndpointInfo;
 import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.core.JmsTemplate102;
@@ -110,6 +113,37 @@
         return jmsTemplate;
     }
 
+    public static DefaultMessageListenerContainer createJmsListener(EndpointInfo ei,
+                                                                    JMSConfiguration jmsConfig,
+                                                                    MessageListener listenerHandler,
+                                                                    String destinationName, 
+                                                                    String messageSelectorPrefix,
+                                                                    boolean userCID) {
+        DefaultMessageListenerContainer jmsListener = null;
+        
+        if (jmsConfig.isUseJms11()) {
+            //Check to see if transport is being used in JCA RA with XA
+            Method method = ei.getProperty(JCATransactionalMessageListenerContainer.MDB_TRANSACTED_METHOD,
+                                           java.lang.reflect.Method.class);
+            if (method != null 
+                && 
+                jmsConfig.getConnectionFactory() instanceof XAConnectionFactory) {
+                jmsListener = new JCATransactionalMessageListenerContainer(ei); 
+            } else {
+                jmsListener = new DefaultMessageListenerContainer();
+            }
+        } else {
+            jmsListener = new DefaultMessageListenerContainer102();
+        }
+        
+        return createJmsListener(jmsListener,
+                                 jmsConfig,
+                                 listenerHandler,
+                                 destinationName, 
+                                 messageSelectorPrefix,
+                                 userCID);            
+    }
+    
     /**
      * Create and start listener using configuration information from jmsConfig. Uses
      * resolveOrCreateDestination to determine the destination for the listener.
@@ -127,6 +161,23 @@
                                                                     boolean userCID) {
         DefaultMessageListenerContainer jmsListener = jmsConfig.isUseJms11()
             ? new DefaultMessageListenerContainer() : new DefaultMessageListenerContainer102();
+        
+        return createJmsListener(jmsListener,
+                                 jmsConfig,
+                                 listenerHandler,
+                                 destinationName, 
+                                 messageSelectorPrefix,
+                                 userCID);    
+    }
+    
+    public static DefaultMessageListenerContainer 
+    createJmsListener(DefaultMessageListenerContainer jmsListener,
+                      JMSConfiguration jmsConfig,
+                      MessageListener listenerHandler,
+                      String destinationName, 
+                      String messageSelectorPrefix,
+                      boolean userCID) {
+        
         jmsListener.setConcurrentConsumers(jmsConfig.getConcurrentConsumers());
         jmsListener.setMaxConcurrentConsumers(jmsConfig.getMaxConcurrentConsumers());
         jmsListener.setPubSubDomain(jmsConfig.isPubSubDomain());