You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2009/07/31 21:54:36 UTC
svn commit: r799724 - in /cxf/trunk:
integration/jca/src/main/java/org/apache/cxf/jca/inbound/ rt/transports/jms/
rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/
Author: dkulp
Date: Fri Jul 31 19:54:36 2009
New Revision: 799724
URL: http://svn.apache.org/viewvc?rev=799724&view=rev
Log:
[CXF-2372] JCA + XA transaction work
Patch from Seumas Soltysik applied (with a lot of mods due to conflicts
on trunk)
Added:
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java (with props)
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/XASingleConnectionFactory.java (with props)
Modified:
cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/DispatchMDBInvoker.java
cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/MDBActivationWork.java
cxf/trunk/rt/transports/jms/pom.xml
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
Modified: cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/DispatchMDBInvoker.java
URL: http://svn.apache.org/viewvc/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/DispatchMDBInvoker.java?rev=799724&r1=799723&r2=799724&view=diff
==============================================================================
--- cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/DispatchMDBInvoker.java (original)
+++ cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/DispatchMDBInvoker.java Fri Jul 31 19:54:36 2009
@@ -53,8 +53,20 @@
@Override
public Object getServiceObject(Exchange context) {
+ MessageEndpoint ep = null;
+ MessageEndpoint epFromMessage = null;
+
+ if (context != null) {
+ epFromMessage = context.getInMessage().getContent(MessageEndpoint.class);
+ }
+
+ if (epFromMessage == null) {
+ ep = getMessageEndpoint();
+ } else {
+ ep = epFromMessage;
+ }
+
Object target = null;
- MessageEndpoint ep = getMessageEndpoint();
if (ep == null) {
LOG.log(Level.SEVERE, "Failed to obtain MessageEndpoint");
@@ -68,9 +80,11 @@
LOG.log(Level.SEVERE, "Failed to obtain service object " + targetJndiName, e);
return null;
} finally {
- releaseEndpoint(ep);
+ if (epFromMessage == null) {
+ releaseEndpoint(ep);
+ }
}
-
+
return target;
}
Modified: cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/MDBActivationWork.java
URL: http://svn.apache.org/viewvc/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/MDBActivationWork.java?rev=799724&r1=799723&r2=799724&view=diff
==============================================================================
--- cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/MDBActivationWork.java (original)
+++ cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/inbound/MDBActivationWork.java Fri Jul 31 19:54:36 2009
@@ -18,6 +18,7 @@
*/
package org.apache.cxf.jca.inbound;
+import java.lang.reflect.Method;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
@@ -39,6 +40,7 @@
import org.apache.cxf.jaxws.EndpointImpl;
import org.apache.cxf.jaxws.EndpointUtils;
import org.apache.cxf.jaxws.JaxWsServerFactoryBean;
+import org.apache.cxf.service.model.EndpointInfo;
/**
*
@@ -51,6 +53,9 @@
public class MDBActivationWork implements Work {
private static final Logger LOG = LogUtils.getL7dLogger(MDBActivationWork.class);
+ private static final String MESSAGE_LISTENER_METHOD = "lookupTargetObject";
+ private static final String MESSAGE_ENDPOINT_FACTORY = "MessageEndpointFactory";
+ private static final String MDB_TRANSACTED_METHOD = "MDBTransactedMethod";
private MDBActivationSpec spec;
private MessageEndpointFactory endpointFactory;
@@ -126,6 +131,16 @@
if (bus == null) {
bus = BusFactory.getDefaultBus();
}
+
+ Method method = null;
+
+ try {
+ Class clazz = org.apache.cxf.jca.inbound.DispatchMDBMessageListener.class;
+ method = clazz.getMethod(MESSAGE_LISTENER_METHOD, new Class[] {String.class});
+ } catch (Exception ex) {
+ LOG.severe("Failed to get method " + MESSAGE_LISTENER_METHOD
+ + " from class DispatchMDBMessageListener.");
+ }
Server server = createServer(bus, serviceClass, invoker);
@@ -134,6 +149,10 @@
return;
}
+ EndpointInfo ei = server.getEndpoint().getEndpointInfo();
+ ei.setProperty(MESSAGE_ENDPOINT_FACTORY, endpointFactory);
+ ei.setProperty(MDB_TRANSACTED_METHOD, method);
+
server.start();
// save the server for clean up later
Modified: cxf/trunk/rt/transports/jms/pom.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/pom.xml?rev=799724&r1=799723&r2=799724&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/pom.xml (original)
+++ cxf/trunk/rt/transports/jms/pom.xml Fri Jul 31 19:54:36 2009
@@ -112,6 +112,11 @@
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-j2ee-connector_1.5_spec</artifactId>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<build>
Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java?rev=799724&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java Fri Jul 31 19:54:36 2009
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.jms;
+
+import java.lang.reflect.Method;
+
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.XASession;
+import javax.resource.spi.endpoint.MessageEndpoint;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.transaction.xa.XAResource;
+
+import org.apache.cxf.service.model.EndpointInfo;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.jms.support.JmsUtils;
+
+public class JCATransactionalMessageListenerContainer extends DefaultMessageListenerContainer {
+ static final ThreadLocal<MessageEndpoint> ENDPOINT_LOCAL = new ThreadLocal<MessageEndpoint>();
+ static final String MESSAGE_ENDPOINT_FACTORY = "MessageEndpointFactory";
+ static final String MDB_TRANSACTED_METHOD = "MDBTransactedMethod";
+ private MessageEndpointFactory factory;
+ private Method method;
+
+ public JCATransactionalMessageListenerContainer(EndpointInfo ei) {
+ factory = ei.getProperty(MESSAGE_ENDPOINT_FACTORY,
+ MessageEndpointFactory.class);
+ method = ei.getProperty(MDB_TRANSACTED_METHOD, Method.class);
+ this.setCacheLevel(CACHE_CONNECTION);
+ }
+
+ protected boolean receiveAndExecute(Object invoker, Session session, MessageConsumer consumer)
+ throws JMSException {
+ boolean messageReceived = false;
+ MessageEndpoint ep = null;
+ MessageConsumer mc = null;
+ XASession xa = null;
+ Session s = null;
+
+ try {
+ xa = (XASession)createSession(getSharedConnection());
+ XAResource xar = xa.getXAResource();
+ s = xa.getSession();
+ mc = s.createConsumer(getDestination());
+ ep = factory.createEndpoint(xar);
+ ENDPOINT_LOCAL.set(ep);
+ ep.beforeDelivery(method);
+ messageReceived = doReceiveAndExecute(invoker, s, mc, null);
+ ep.afterDelivery();
+ } catch (Exception ex) {
+ throw new JMSException(ex.getMessage());
+ } finally {
+ ep.release();
+ JmsUtils.closeMessageConsumer(mc);
+ JmsUtils.closeSession(xa);
+ JmsUtils.closeSession(s);
+ }
+
+ return messageReceived;
+ }
+
+}
Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java?rev=799724&r1=799723&r2=799724&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java Fri Jul 31 19:54:36 2009
@@ -20,6 +20,7 @@
import javax.jms.ConnectionFactory;
import javax.jms.Message;
+import javax.jms.XAConnectionFactory;
import org.apache.cxf.configuration.ConfigurationException;
import org.springframework.beans.factory.InitializingBean;
@@ -419,7 +420,11 @@
if (wrapInSingleConnectionFactory && !(connectionFactory instanceof SingleConnectionFactory)) {
SingleConnectionFactory scf;
if (useJms11) {
- scf = new SingleConnectionFactory(connectionFactory);
+ if (connectionFactory instanceof XAConnectionFactory) {
+ scf = new XASingleConnectionFactory(connectionFactory);
+ } else {
+ scf = new SingleConnectionFactory(connectionFactory);
+ }
} else {
scf = new SingleConnectionFactory102(connectionFactory, pubSubDomain);
}
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=799724&r1=799723&r2=799724&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Fri Jul 31 19:54:36 2009
@@ -39,6 +39,7 @@
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.resource.spi.endpoint.MessageEndpoint;
import org.apache.cxf.Bus;
import org.apache.cxf.BusFactory;
@@ -73,6 +74,7 @@
private JMSConfiguration jmsConfig;
private Bus bus;
+ private EndpointInfo ei;
private DefaultMessageListenerContainer jmsListener;
private Collection<JMSContinuation> continuations =
new ConcurrentLinkedQueue<JMSContinuation>();
@@ -80,6 +82,7 @@
public JMSDestination(Bus b, EndpointInfo info, JMSConfiguration jmsConfig) {
super(b, getTargetReference(info, b), info);
this.bus = b;
+ this.ei = info;
this.jmsConfig = jmsConfig;
info.setProperty(OneWayProcessorInterceptor.USE_ORIGINAL_THREAD, Boolean.TRUE);
}
@@ -102,8 +105,8 @@
org.apache.cxf.common.i18n.Message msg =
new org.apache.cxf.common.i18n.Message("INSUFFICIENT_CONFIGURATION_DESTINATION", LOG, name);
jmsConfig.ensureProperlyConfigured(msg);
- jmsListener = JMSFactory.createJmsListener(jmsConfig, this,
- jmsConfig.getTargetDestination(), null);
+ jmsListener = JMSFactory.createJmsListener(ei, jmsConfig, this,
+ jmsConfig.getTargetDestination());
}
public void deactivate() {
@@ -190,6 +193,12 @@
BusFactory.setThreadDefaultBus(bus);
+ MessageEndpoint ep = JCATransactionalMessageListenerContainer.ENDPOINT_LOCAL.get();
+ if (ep != null) {
+ inMessage.setContent(MessageEndpoint.class, ep);
+ JCATransactionalMessageListenerContainer.ENDPOINT_LOCAL.remove();
+ }
+
// handle the incoming message
incomingObserver.onMessage(inMessage);
} catch (SuspendedInvocationException ex) {
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java?rev=799724&r1=799723&r2=799724&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java Fri Jul 31 19:54:36 2009
@@ -18,6 +18,7 @@
*/
package org.apache.cxf.transport.jms;
+import java.lang.reflect.Method;
import java.util.logging.Logger;
import javax.jms.ConnectionFactory;
@@ -26,9 +27,11 @@
import javax.jms.MessageListener;
import javax.jms.QueueSession;
import javax.jms.Session;
+import javax.jms.XAConnectionFactory;
import javax.naming.NamingException;
import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.service.model.EndpointInfo;
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.JmsTemplate102;
@@ -112,23 +115,49 @@
}
return jmsTemplate;
}
-
/**
* Create and start listener using configuration information from jmsConfig. Uses
* resolveOrCreateDestination to determine the destination for the listener.
*
+ * @param ei the EndpointInfo for the listener
* @param jmsConfig configuration information
* @param listenerHandler object to be called when a message arrives
* @param destinationName null for temp dest or a destination name
- * @param conduitId prefix for the messageselector
* @return
*/
- public static DefaultMessageListenerContainer createJmsListener(JMSConfiguration jmsConfig,
+ public static DefaultMessageListenerContainer createJmsListener(EndpointInfo ei,
+ JMSConfiguration jmsConfig,
MessageListener listenerHandler,
- String destinationName,
- String conduitId) {
- DefaultMessageListenerContainer jmsListener = jmsConfig.isUseJms11()
- ? new DefaultMessageListenerContainer() : new DefaultMessageListenerContainer102();
+ String destinationName) {
+ DefaultMessageListenerContainer jmsListener = null;
+
+ if (jmsConfig.isUseJms11()) {
+ //Check to see if transport is being used in JCA RA with XA
+ Method method = ei.getProperty(JCATransactionalMessageListenerContainer.MDB_TRANSACTED_METHOD,
+ java.lang.reflect.Method.class);
+ if (method != null
+ &&
+ jmsConfig.getConnectionFactory() instanceof XAConnectionFactory) {
+ jmsListener = new JCATransactionalMessageListenerContainer(ei);
+ } else {
+ jmsListener = new DefaultMessageListenerContainer();
+ }
+ } else {
+ jmsListener = new DefaultMessageListenerContainer102();
+ }
+
+ return createJmsListener(jmsListener,
+ jmsConfig,
+ listenerHandler,
+ destinationName);
+ }
+
+ public static DefaultMessageListenerContainer createJmsListener(
+ DefaultMessageListenerContainer jmsListener,
+ JMSConfiguration jmsConfig,
+ MessageListener listenerHandler,
+ String destinationName) {
+
jmsListener.setConcurrentConsumers(jmsConfig.getConcurrentConsumers());
jmsListener.setMaxConcurrentConsumers(jmsConfig.getMaxConcurrentConsumers());
jmsListener.setPubSubDomain(jmsConfig.isPubSubDomain());
@@ -161,15 +190,11 @@
if (jmsConfig.isAcceptMessagesWhileStopping()) {
jmsListener.setAcceptMessagesWhileStopping(jmsConfig.isAcceptMessagesWhileStopping());
}
- /*String staticSelectorPrefix = jmsConfig.getConduitSelectorPrefix();
- if (conduitId != null && jmsConfig.isUseConduitIdSelector()) {
- jmsListener.setMessageSelector("JMSCorrelationID LIKE '"
- + staticSelectorPrefix
- + conduitId + "%'");
- } else if (staticSelectorPrefix.length() > 0) {
+ String staticSelectorPrefix = jmsConfig.getConduitSelectorPrefix();
+ if (staticSelectorPrefix.length() > 0) {
jmsListener.setMessageSelector("JMSCorrelationID LIKE '"
+ staticSelectorPrefix + "%'");
- }*/
+ }
if (jmsConfig.getDestinationResolver() != null) {
jmsListener.setDestinationResolver(jmsConfig.getDestinationResolver());
}
Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/XASingleConnectionFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/XASingleConnectionFactory.java?rev=799724&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/XASingleConnectionFactory.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/XASingleConnectionFactory.java Fri Jul 31 19:54:36 2009
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.jms;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+
+import org.springframework.jms.connection.SingleConnectionFactory;
+
+public class XASingleConnectionFactory extends SingleConnectionFactory {
+
+ public XASingleConnectionFactory(ConnectionFactory targetConnectionFactory) {
+ super(targetConnectionFactory);
+ }
+
+ protected Connection doCreateConnection() throws JMSException {
+ XAConnectionFactory xcf = (XAConnectionFactory)getTargetConnectionFactory();
+ return xcf.createXAConnection();
+ }
+
+ protected Session getSession(Connection con, Integer mode) throws JMSException {
+ XAConnection xac = (XAConnection)con;
+ return xac.createXASession();
+ }
+
+}
Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/XASingleConnectionFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/XASingleConnectionFactory.java
------------------------------------------------------------------------------
svn:keywords = Rev Date