You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ni...@apache.org on 2008/09/05 08:04:05 UTC
svn commit: r692344 [1/2] - in /cxf/branches/2.1.x-fixes: ./
rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/
rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/
Author: ningjiang
Date: Thu Sep 4 23:04:05 2008
New Revision: 692344
URL: http://svn.apache.org/viewvc?rev=692344&view=rev
Log:
Merged revisions 692329 via svnmerge from
https://svn.apache.org/repos/asf/cxf/trunk
........
r692329 | ningjiang | 2008-09-05 11:38:07 +0800 (Fri, 05 Sep 2008) | 1 line
CXF-1783 applied patch with thanks to Christian, als fixed some typos on the comments
........
Added:
cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSExchangeSender.java
- copied unchanged from r692329, cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSExchangeSender.java
cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSListenerThread.java
- copied unchanged from r692329, cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSListenerThread.java
cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOutputStream.java
- copied unchanged from r692329, cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOutputStream.java
Removed:
cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOnConnectCallback.java
cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSProviderHub.java
Modified:
cxf/branches/2.1.x-fixes/ (props changed)
cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java
cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java
cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/PooledSession.java
cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledSessionTest.java
Propchange: cxf/branches/2.1.x-fixes/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java?rev=692344&r1=692343&r2=692344&view=diff
==============================================================================
--- cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java (original)
+++ cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java Thu Sep 4 23:04:05 2008
@@ -23,42 +23,39 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.List;
-import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.Destination;
import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
import javax.jms.Queue;
-import javax.jms.QueueSender;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.naming.NamingException;
+import javax.naming.Context;
import org.apache.cxf.Bus;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.configuration.Configurable;
import org.apache.cxf.configuration.Configurer;
-import org.apache.cxf.helpers.CastUtils;
-import org.apache.cxf.io.CachedOutputStream;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.AbstractConduit;
-import org.apache.cxf.transport.Conduit;
-import org.apache.cxf.transport.MessageObserver;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
-public class JMSConduit extends AbstractConduit implements Configurable, JMSOnConnectCallback {
+/**
+ * JMSConduit is instantiated by the JMSTransportfactory which is selected by a client if the transport
+ * protocol starts with jms:// JMSConduit converts CXF Messages to JMS Messages and sends the request by using
+ * JMS topics or queues. If the Exchange is not oneway it then recevies the response and converts it to a CXF
+ * Message. This is then provided in the Exchange and also sent to the incomingObserver
+ */
+public class JMSConduit extends AbstractConduit implements Configurable, JMSExchangeSender {
protected static final String BASE_BEAN_NAME_SUFFIX = ".jms-conduit-base";
- private static final Logger LOG = LogUtils.getL7dLogger(JMSConduit.class);
+ static final Logger LOG = LogUtils.getL7dLogger(JMSConduit.class);
protected Destination targetDestination;
- protected Destination replyDestination;
protected JMSSessionFactory sessionFactory;
protected Bus bus;
protected EndpointInfo endpointInfo;
@@ -69,136 +66,191 @@
protected AddressType address;
protected SessionPoolType sessionPool;
+ private Queue replyDestination;
+
+ private Context context;
+
public JMSConduit(Bus b, EndpointInfo endpointInfo) {
this(b, endpointInfo, null);
}
public JMSConduit(Bus b, EndpointInfo endpointInfo, EndpointReferenceType target) {
super(target);
-
this.bus = b;
this.endpointInfo = endpointInfo;
this.beanNameSuffix = BASE_BEAN_NAME_SUFFIX;
initConfig();
}
- // prepare the message for send out , not actually send out the message
- public void prepare(Message message) throws IOException {
- getLogger().log(Level.FINE, "JMSConduit send message");
-
- try {
- if (null == sessionFactory) {
- JMSProviderHub.connect(this, getJMSAddress(), getSessionPool());
- }
- } catch (JMSException jmsex) {
- getLogger().log(Level.WARNING, "JMS connect failed with JMSException : ", jmsex);
- throw new IOException(jmsex.toString());
- } catch (NamingException ne) {
- getLogger().log(Level.WARNING, "JMS connect failed with NamingException : ", ne);
- throw new IOException(ne.toString());
- }
+ private void initConfig() {
+ this.address = endpointInfo.getTraversedExtensor(new AddressType(), AddressType.class);
+ this.sessionPool = endpointInfo.getTraversedExtensor(new SessionPoolType(), SessionPoolType.class);
+ this.clientConfig = endpointInfo.getTraversedExtensor(new ClientConfig(), ClientConfig.class);
+ this.runtimePolicy = endpointInfo.getTraversedExtensor(new ClientBehaviorPolicyType(),
+ ClientBehaviorPolicyType.class);
- if (sessionFactory == null) {
- throw new java.lang.IllegalStateException("JMSClientTransport not connected");
+ Configurer configurer = bus.getExtension(Configurer.class);
+ if (null != configurer) {
+ configurer.configureBean(this);
}
+ }
- try {
- boolean isOneWay = false;
- // test if the message is oneway message
- Exchange ex = message.getExchange();
- if (null != ex) {
- isOneWay = ex.isOneWay();
+ public JMSSessionFactory getOrCreateSessionFactory() {
+ if (this.sessionFactory == null) {
+ try {
+ this.context = JMSUtils.getInitialContext(address);
+ this.sessionFactory = JMSSessionFactory
+ .connect(getJMSAddress(), getSessionPool(), null);
+ this.targetDestination = JMSUtils.resolveRequestDestination(sessionFactory
+ .getInitialContext(), sessionFactory.getConnection(), address);
+ this.replyDestination = JMSUtils.resolveReplyDestination(context, sessionFactory
+ .getConnection(), address);
+ } catch (Exception jmsex) {
+ throw new RuntimeException("JMS connect failed: ", jmsex);
}
- // get the pooledSession with response expected
- PooledSession pooledSession = sessionFactory.get(!isOneWay);
- // put the PooledSession into the outMessage
- message.put(JMSConstants.JMS_POOLEDSESSION, pooledSession);
-
- } catch (JMSException jmsex) {
- throw new IOException(jmsex.getMessage());
}
-
- message.setContent(OutputStream.class, new JMSOutputStream(message));
-
- }
-
- public void close() {
- getLogger().log(Level.FINE, "JMSConduit closed ");
-
- // ensure resources held by session factory are released
- //
- if (sessionFactory != null) {
- sessionFactory.shutdown();
+ if (this.targetDestination == null) {
+ throw new RuntimeException("Failed to lookup or create requestDestination");
}
+ return this.sessionFactory;
}
- protected Logger getLogger() {
- return LOG;
+ // prepare the message for send out , not actually send out the message
+ public void prepare(Message message) throws IOException {
+ if (this.address == null || this.address.getJndiConnectionFactoryName() == null) {
+ throw new RuntimeException("Insufficient configuration for Conduit. "
+ + "Did you configure a <jms:conduit name=\""
+ + getBeanName() + "\"> and set the jndiConnectionFactoryName ?");
+ }
+ message.setContent(OutputStream.class, new JMSOutputStream(this,
+ message.getExchange(), isTextPayload()));
+ // After this step flow will continue in JMSOutputStream.doClose()
}
/**
- * Receive mechanics.
+ * Send the JMS Request out and if not oneWay receive the response
*
- * @param pooledSession the shared JMS resources
- * @param inMessage
- * @retrun the response buffer
+ * @param outMessage
+ * @param request
+ * @return inMessage
*/
- private Object receive(PooledSession pooledSession, Message outMessage, Message inMessage)
- throws JMSException {
-
- Object result = null;
+ public void sendExchange(Exchange exchange, Object request) {
+ LOG.log(Level.FINE, "JMSConduit send message");
- long timeout = getClientConfig().getClientReceiveTimeout();
+ sessionFactory = getOrCreateSessionFactory();
+ PooledSession pooledSession = null;
+ try {
+ pooledSession = sessionFactory.get();
+ Destination replyTo = null;
+ if (!exchange.isOneWay()) {
+ pooledSession.initConsumerAndReplyDestination(replyDestination);
+ replyTo = pooledSession.getReplyDestination();
+ }
- Long receiveTimeout = (Long)outMessage.get(JMSConstants.JMS_CLIENT_RECEIVE_TIMEOUT);
+ // TODO setting up the responseExpected
- if (receiveTimeout != null) {
- timeout = receiveTimeout.longValue();
+ // We don't want to send temp queue in
+ // replyTo header for oneway calls
+ if (exchange.isOneWay() && (getJMSAddress().getJndiReplyDestinationName() == null)) {
+ replyTo = null;
+ }
+ Message outMessage = exchange.getOutMessage();
+ if (outMessage == null) {
+ throw new RuntimeException("Exchange to be sent has no outMessage");
+ }
+ sendMessage(outMessage, request, pooledSession, replyTo);
+
+ if (!exchange.isOneWay()) {
+ long receiveTimeout = clientConfig.getClientReceiveTimeout();
+ Long messageReceiveTimeout = (Long)exchange.getOutMessage()
+ .get(JMSConstants.JMS_CLIENT_RECEIVE_TIMEOUT);
+ if (messageReceiveTimeout != null) {
+ receiveTimeout = messageReceiveTimeout.longValue();
+ }
+ Message inMessage = receiveResponse(pooledSession.consumer(), receiveTimeout);
+ exchange.setInMessage(inMessage);
+ incomingObserver.onMessage(inMessage);
+ }
+ } finally {
+ sessionFactory.recycle(pooledSession);
}
+ }
- javax.jms.Message jmsMessage = pooledSession.consumer().receive(timeout);
- getLogger().log(Level.FINE, "client received reply: ", jmsMessage);
+ private void sendMessage(Message outMessage, Object request, PooledSession pooledSession,
+ Destination replyTo) {
+ try {
+ String messageType = runtimePolicy.getMessageType().value();
+ javax.jms.Message jmsMessage;
+ jmsMessage = JMSUtils.buildJMSMessageFromCXFMessage(outMessage, request, messageType,
+ pooledSession.session(), replyTo,
+ pooledSession.getCorrelationID());
- if (jmsMessage != null) {
+ // Retrieve JMS QoS parameters from CXF message headers
+ JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
+ .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
+ long ttl = JMSUtils.getTimeToLive(headers);
+ if (ttl <= 0) {
+ ttl = clientConfig.getMessageTimeToLive();
+ }
+ int deliveryMode = JMSUtils.getJMSDeliveryMode(headers);
+ int priority = JMSUtils.getJMSPriority(headers);
+
+ LOG.log(Level.FINE, "client sending request: ", jmsMessage);
+ JMSUtils.sendMessage(pooledSession.producer(), targetDestination, jmsMessage, ttl, deliveryMode,
+ priority);
+ } catch (JMSException e) {
+ throw new RuntimeException("Problem while sending JMS message", e);
+ }
+ }
+
+ private Message receiveResponse(MessageConsumer consumer, long receiveTimeout) {
+ // TODO if outMessage need to get the response
+ try {
+ Message inMessage = new MessageImpl();
+ // set the message header back to the incomeMessage
+ // inMessage.put(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS,
+ // outMessage.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS));
+ byte[] response = null;
+ javax.jms.Message jmsMessage = consumer.receive(receiveTimeout);
+ if (jmsMessage == null) {
+ // TODO: Review what exception should we throw.
+ throw new JMSException("JMS receive timed out");
+ }
+ LOG.log(Level.FINE, "client received reply: ", jmsMessage);
JMSUtils.populateIncomingContext(jmsMessage, inMessage, JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
- result = JMSUtils.unmarshal(jmsMessage);
- return result;
- } else {
- String error = "JMSClientTransport.receive() timed out. No message available.";
- getLogger().log(Level.SEVERE, error);
- // TODO: Review what exception should we throw.
- throw new JMSException(error);
+ response = JMSUtils.retrievePayload(jmsMessage);
+ LOG.log(Level.FINE, "The Response Message payload is : [" + response + "]");
+
+ // setup the inMessage response stream
+ inMessage.setContent(InputStream.class, new ByteArrayInputStream(response));
+ LOG.log(Level.FINE, "incoming observer is " + incomingObserver);
+ return inMessage;
+ } catch (JMSException e) {
+ throw new RuntimeException("Problem while receiving JMS message", e);
}
- }
- public void connected(Destination target, Destination reply, JMSSessionFactory factory) {
- this.targetDestination = target;
- this.replyDestination = reply;
- this.sessionFactory = factory;
}
- public String getBeanName() {
- return endpointInfo.getName().toString() + ".jms-conduit";
+ private boolean isTextPayload() {
+ return JMSConstants.TEXT_MESSAGE_TYPE.equals(runtimePolicy.getMessageType().value());
}
- private void initConfig() {
-
- this.address = endpointInfo.getTraversedExtensor(new AddressType(), AddressType.class);
- this.sessionPool = endpointInfo.getTraversedExtensor(new SessionPoolType(), SessionPoolType.class);
- this.clientConfig = endpointInfo.getTraversedExtensor(new ClientConfig(), ClientConfig.class);
- this.runtimePolicy = endpointInfo.getTraversedExtensor(new ClientBehaviorPolicyType(),
- ClientBehaviorPolicyType.class);
-
- Configurer configurer = bus.getExtension(Configurer.class);
- if (null != configurer) {
- configurer.configureBean(this);
+ public void close() {
+ getLogger().log(Level.FINE, "JMSConduit closed ");
+ // ensure resources held by session factory are released
+ if (sessionFactory != null) {
+ sessionFactory.shutdown();
}
}
- private boolean isTextPayload() {
- return JMSConstants.TEXT_MESSAGE_TYPE.equals(getRuntimePolicy().getMessageType().value());
+ protected Logger getLogger() {
+ return LOG;
+ }
+
+ public String getBeanName() {
+ return endpointInfo.getName().toString() + ".jms-conduit";
}
public AddressType getJMSAddress() {
@@ -232,190 +284,4 @@
public void setSessionPool(SessionPoolType sessionPool) {
this.sessionPool = sessionPool;
}
-
- private class JMSOutputStream extends CachedOutputStream {
- private Message outMessage;
- private javax.jms.Message jmsMessage;
- private PooledSession pooledSession;
- private boolean isOneWay;
-
- public JMSOutputStream(Message m) {
- outMessage = m;
- pooledSession = (PooledSession)outMessage.get(JMSConstants.JMS_POOLEDSESSION);
- }
-
- protected void doFlush() throws IOException {
- // do nothing here
- }
-
- protected void doClose() throws IOException {
- try {
- isOneWay = outMessage.getExchange().isOneWay();
- commitOutputMessage();
- if (!isOneWay) {
- handleResponse();
- }
- } catch (JMSException jmsex) {
- getLogger().log(Level.WARNING, "JMS connect failed with JMSException : ", jmsex);
- throw new IOException(jmsex.toString());
- } finally {
- sessionFactory.recycle(pooledSession);
- }
- }
-
- protected void onWrite() throws IOException {
-
- }
-
- private void commitOutputMessage() throws JMSException {
- javax.jms.Destination replyTo = pooledSession.destination();
- // TODO setting up the responseExpected
-
- // We don't want to send temp queue in
- // replyTo header for oneway calls
- if (isOneWay && (getJMSAddress().getJndiReplyDestinationName() == null)) {
- replyTo = null;
- }
-
- Object request = null;
- try {
- if (isTextPayload()) {
- StringBuilder builder = new StringBuilder(2048);
- this.writeCacheTo(builder);
- request = builder.toString();
- } else {
- request = getBytes();
- }
- } catch (IOException ex) {
- JMSException ex2 = new JMSException("Error creating request");
- ex2.setLinkedException(ex);
- throw ex2;
- }
- if (getLogger().isLoggable(Level.FINE)) {
- getLogger().log(Level.FINE, "Conduit Request is :[" + request + "]");
- }
-
- jmsMessage = JMSUtils.marshal(request, pooledSession.session(), replyTo, getRuntimePolicy()
- .getMessageType().value());
-
- JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
- .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
-
- int deliveryMode = JMSUtils.getJMSDeliveryMode(headers);
- int priority = JMSUtils.getJMSPriority(headers);
- String correlationID = JMSUtils.getCorrelationId(headers);
- long ttl = JMSUtils.getTimeToLive(headers);
- if (ttl <= 0) {
- ttl = getClientConfig().getMessageTimeToLive();
- }
-
- JMSUtils.setMessageProperties(headers, jmsMessage);
- // ensure that the contentType is set to the out jms message header
- JMSUtils.setContentToProtocalHeader(outMessage);
- Map<String, List<String>> protHeaders = CastUtils.cast((Map<?, ?>)outMessage
- .get(Message.PROTOCOL_HEADERS));
- JMSUtils.addProtocolHeaders(jmsMessage, protHeaders);
- if (!isOneWay) {
- String id = pooledSession.getCorrelationID();
-
- if (id != null) {
- if (correlationID != null) {
- String error = "User cannot set JMSCorrelationID when "
- + "making a request/reply invocation using "
- + "a static replyTo Queue.";
- throw new JMSException(error);
- }
- correlationID = id;
- }
- }
-
- if (correlationID != null) {
- jmsMessage.setJMSCorrelationID(correlationID);
- } else {
- // No message correlation id is set. Whatever comeback will be accepted as responses.
- // We assume that it will only happen in case of the temp. reply queue.
- }
-
- getLogger().log(Level.FINE, "client sending request: ", jmsMessage);
- // getting Destination Style
- if (JMSUtils.isDestinationStyleQueue(address)) {
- QueueSender sender = (QueueSender)pooledSession.producer();
- sender.setTimeToLive(ttl);
- sender.send((Queue)targetDestination, jmsMessage, deliveryMode, priority, ttl);
- } else {
- TopicPublisher publisher = (TopicPublisher)pooledSession.producer();
- publisher.setTimeToLive(ttl);
- publisher.publish((Topic)targetDestination, jmsMessage, deliveryMode, priority, ttl);
- }
- }
-
- private void handleResponse() throws IOException {
- // REVISIT distinguish decoupled case or oneway call
- Object response = null;
-
- // TODO if outMessage need to get the response
- Message inMessage = new MessageImpl();
- outMessage.getExchange().setInMessage(inMessage);
- // set the message header back to the incomeMessage
- // inMessage.put(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS,
- // outMessage.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS));
-
- try {
- response = receive(pooledSession, outMessage, inMessage);
- } catch (JMSException jmsex) {
- getLogger().log(Level.FINE, "JMS connect failed with JMSException : ", jmsex);
- throw new IOException(jmsex.toString());
- }
-
- getLogger().log(Level.FINE, "The Response Message is : [" + response + "]");
-
- // setup the inMessage response stream
- byte[] bytes = null;
- if (response instanceof String) {
- String requestString = (String)response;
- bytes = requestString.getBytes();
- } else {
- bytes = (byte[])response;
- }
- inMessage.setContent(InputStream.class, new ByteArrayInputStream(bytes));
- getLogger().log(Level.FINE, "incoming observer is " + incomingObserver);
- incomingObserver.onMessage(inMessage);
- }
- }
-
- /**
- * Represented decoupled response endpoint.
- */
- protected class DecoupledDestination implements Destination {
- protected MessageObserver decoupledMessageObserver;
- private EndpointReferenceType address;
-
- DecoupledDestination(EndpointReferenceType ref, MessageObserver incomingObserver) {
- address = ref;
- decoupledMessageObserver = incomingObserver;
- }
-
- public EndpointReferenceType getAddress() {
- return address;
- }
-
- public Conduit getBackChannel(Message inMessage, Message partialResponse, EndpointReferenceType addr)
- throws IOException {
- // shouldn't be called on decoupled endpoint
- return null;
- }
-
- public void shutdown() {
- // TODO Auto-generated method stub
- }
-
- public synchronized void setMessageObserver(MessageObserver observer) {
- decoupledMessageObserver = observer;
- }
-
- public synchronized MessageObserver getMessageObserver() {
- return decoupledMessageObserver;
- }
- }
-
}
Modified: cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=692344&r1=692343&r2=692344&view=diff
==============================================================================
--- cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original)
+++ cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Thu Sep 4 23:04:05 2008
@@ -30,16 +30,18 @@
import java.util.SimpleTimeZone;
import java.util.TimeZone;
import java.util.concurrent.Executor;
-import java.util.concurrent.RejectedExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.BytesMessage;
+import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
+import javax.jms.MessageListener;
import javax.jms.Queue;
-import javax.jms.QueueSender;
+import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.naming.Context;
import javax.naming.NamingException;
import javax.xml.namespace.QName;
@@ -49,8 +51,7 @@
import org.apache.cxf.configuration.Configurable;
import org.apache.cxf.configuration.Configurer;
import org.apache.cxf.helpers.CastUtils;
-import org.apache.cxf.helpers.IOUtils;
-import org.apache.cxf.io.CachedOutputStream;
+import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.service.model.EndpointInfo;
@@ -59,12 +60,13 @@
import org.apache.cxf.transport.Conduit;
import org.apache.cxf.transport.ConduitInitiator;
import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.workqueue.SynchronousExecutor;
import org.apache.cxf.workqueue.WorkQueueManager;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.wsdl.EndpointReferenceUtils;
-public class JMSDestination extends AbstractMultiplexDestination implements Configurable,
- JMSOnConnectCallback {
+public class JMSDestination extends AbstractMultiplexDestination implements Configurable, MessageListener,
+ JMSExchangeSender {
protected static final String BASE_BEAN_NAME_SUFFIX = ".jms-destination-base";
@@ -75,16 +77,14 @@
protected AddressType address;
protected SessionPoolType sessionPool;
protected Destination targetDestination;
- protected Destination replyDestination;
+ protected Destination replyToDestination;
protected JMSSessionFactory sessionFactory;
protected Bus bus;
protected EndpointInfo endpointInfo;
protected String beanNameSuffix;
-
- final ConduitInitiator conduitInitiator;
-
- PooledSession listenerSession;
+ final ConduitInitiator conduitInitiator;
+ Session listenerSession;
JMSListenerThread listenerThread;
public JMSDestination(Bus b, ConduitInitiator ci, EndpointInfo info) throws IOException {
@@ -98,8 +98,16 @@
initConfig();
}
- protected Logger getLogger() {
- return LOG;
+ private void initConfig() {
+ this.runtimePolicy = endpointInfo.getTraversedExtensor(new ServerBehaviorPolicyType(),
+ ServerBehaviorPolicyType.class);
+ this.serverConfig = endpointInfo.getTraversedExtensor(new ServerConfig(), ServerConfig.class);
+ this.address = endpointInfo.getTraversedExtensor(new AddressType(), AddressType.class);
+ this.sessionPool = endpointInfo.getTraversedExtensor(new SessionPoolType(), SessionPoolType.class);
+ Configurer configurer = bus.getExtension(Configurer.class);
+ if (null != configurer) {
+ configurer.configureBean(this);
+ }
}
/**
@@ -107,20 +115,57 @@
* @return the inbuilt backchannel
*/
protected Conduit getInbuiltBackChannel(Message inMessage) {
- return new BackChannelConduit(EndpointReferenceUtils.getAnonymousEndpointReference(), inMessage);
+ EndpointReferenceType anon = EndpointReferenceUtils.getAnonymousEndpointReference();
+ return new BackChannelConduit(this, anon, inMessage);
+ }
+
+ private Executor getExecutor(WorkQueueManager wqm, QName name) {
+ // Fallback if no Workqueuemanager
+ Executor executor = SynchronousExecutor.getInstance();
+ if (wqm != null) {
+ if (name != null) {
+ executor = wqm.getNamedWorkQueue("{" + name.getNamespaceURI() + "}"
+ + name.getLocalPart());
+ }
+ if (executor == null) {
+ executor = wqm.getNamedWorkQueue("jms");
+ }
+ if (executor == null) {
+ executor = wqm.getAutomaticWorkQueue();
+ }
+ }
+ return executor;
}
+ /**
+ * Initialize Sessionfactory, Initialize and start ListenerThread {@inheritDoc}
+ */
public void activate() {
- getLogger().log(Level.INFO, "JMSServerTransport activate().... ");
+ getLogger().log(Level.INFO, "JMSDestination activate().... ");
+
+ if (this.address == null || this.address.getJndiConnectionFactoryName() == null) {
+ throw new RuntimeException("Insufficient configuration for Destination. "
+ + "Did you configure a <jms:destination name=\"" + getBeanName()
+ + "\"> and set the jndiConnectionFactoryName ?");
+ }
try {
getLogger().log(Level.FINE, "establishing JMS connection");
- JMSProviderHub.connect(this, getJMSAddress(), getSessionPool(), serverConfig, runtimePolicy);
- // Get a non-pooled session.
- listenerSession = sessionFactory.get(targetDestination);
- listenerThread = new JMSListenerThread(listenerSession, getEndpointInfo() == null
- ? null : getEndpointInfo().getName());
- listenerThread.start();
+ sessionFactory = JMSSessionFactory.connect(getJMSAddress(), getSessionPool(), serverConfig);
+ Connection connection = sessionFactory.getConnection();
+ Context context = sessionFactory.getInitialContext();
+ this.targetDestination = JMSUtils.resolveRequestDestination(context, connection, address);
+ this.replyToDestination = JMSUtils.resolveRequestDestination(context, connection, address);
+ WorkQueueManager wqm = bus.getExtension(WorkQueueManager.class);
+ QName name = null;
+ if (endpointInfo != null) {
+ name = endpointInfo.getName();
+ }
+ Executor executor = getExecutor(wqm, name);
+ String messageSelector = runtimePolicy.getMessageSelector();
+ String durableName = runtimePolicy.getDurableSubscriberName();
+ listenerThread = new JMSListenerThread(executor, this);
+ listenerThread.start(connection, targetDestination, messageSelector, durableName);
} catch (JMSException ex) {
getLogger().log(Level.SEVERE, "JMS connect failed with JMSException : ", ex);
} catch (NamingException nex) {
@@ -129,17 +174,10 @@
}
public void deactivate() {
- try {
- listenerSession.consumer().close();
- if (listenerThread != null) {
- listenerThread.join();
- }
- sessionFactory.shutdown();
- } catch (InterruptedException e) {
- // Do nothing here
- } catch (JMSException ex) {
- // Do nothing here
+ if (listenerThread != null) {
+ listenerThread.close();
}
+ sessionFactory.shutdown();
}
public void shutdown() {
@@ -148,21 +186,21 @@
}
public Queue getReplyToDestination(Message inMessage) throws JMSException, NamingException {
- Queue replyTo;
javax.jms.Message message = (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE);
// If WS-Addressing had set the replyTo header.
- if (inMessage.get(JMSConstants.JMS_REBASED_REPLY_TO) != null) {
- replyTo = sessionFactory.getQueueFromInitialContext((String)inMessage
- .get(JMSConstants.JMS_REBASED_REPLY_TO));
+ String replyToName = (String)inMessage.get(JMSConstants.JMS_REBASED_REPLY_TO);
+ if (replyToName != null) {
+ Context context = sessionFactory.getInitialContext();
+ return (Queue)context.lookup(replyToName);
+ } else if (message.getJMSReplyTo() != null) {
+ return (Queue)message.getJMSReplyTo();
} else {
- replyTo = (null != message.getJMSReplyTo())
- ? (Queue)message.getJMSReplyTo() : (Queue)replyDestination;
+ return (Queue)replyToDestination;
}
- return replyTo;
}
- public void setReplyCorrelationID(javax.jms.Message request,
- javax.jms.Message reply) throws JMSException {
+ public void setReplyCorrelationID(javax.jms.Message request, javax.jms.Message reply)
+ throws JMSException {
String correlationID = request.getJMSCorrelationID();
@@ -176,70 +214,127 @@
}
}
- protected void incoming(javax.jms.Message message) throws IOException {
+ /**
+ * Convert JMS message received by ListenerThread to CXF message and inform incomingObserver that a
+ * message was received. The observer will call the service and then send the response CXF message by
+ * using the BackChannelConduit
+ *
+ * @param message
+ * @throws IOException
+ */
+ public void onMessage(javax.jms.Message message) {
try {
getLogger().log(Level.FINE, "server received request: ", message);
- Object request = JMSUtils.unmarshal(message);
+ byte[] request = JMSUtils.retrievePayload(message);
getLogger().log(Level.FINE, "The Request Message is [ " + request + "]");
- byte[] bytes = null;
-
- if (message instanceof TextMessage) {
- String requestString = (String)request;
- getLogger().log(Level.FINE, "server received request: ", requestString);
- bytes = requestString.getBytes();
- } else {
- // Both ByteMessage and ObjectMessage would get unmarshalled to byte array.
- bytes = (byte[])request;
- }
- // get the message to be interceptor
+ // Build CXF message from JMS message
MessageImpl inMessage = new MessageImpl();
- inMessage.setContent(InputStream.class, new ByteArrayInputStream(bytes));
+ inMessage.setContent(InputStream.class, new ByteArrayInputStream(request));
JMSUtils.populateIncomingContext(message, inMessage, JMSConstants.JMS_SERVER_REQUEST_HEADERS);
inMessage.put(JMSConstants.JMS_SERVER_RESPONSE_HEADERS, new JMSMessageHeadersType());
inMessage.put(JMSConstants.JMS_REQUEST_MESSAGE, message);
-
inMessage.setDestination(this);
BusFactory.setThreadDefaultBus(bus);
// handle the incoming message
incomingObserver.onMessage(inMessage);
-
- } catch (JMSException jmsex) {
- // TODO: need to revisit for which exception should we throw.
- throw new IOException(jmsex.getMessage());
+ } catch (JMSException e) {
+ throw new RuntimeException("Error handling JMS message", e);
} finally {
BusFactory.setThreadDefaultBus(null);
}
}
- public void connected(javax.jms.Destination target,
- javax.jms.Destination reply,
- JMSSessionFactory factory) {
- this.targetDestination = target;
- this.replyDestination = reply;
- this.sessionFactory = factory;
- }
+ public void sendExchange(Exchange exchange, Object replyObj) {
+ Message inMessage = exchange.getInMessage();
+ Message outMessage = exchange.getOutMessage();
+ if (!JMSUtils.isDestinationStyleQueue(address)) {
+ // we will never receive a non-oneway invocation in pub-sub
+ // domain from CXF client - however a mis-behaving pure JMS
+ // client could conceivably make suce an invocation, in which
+ // case we silently discard the reply
+ getLogger().log(Level.WARNING, "discarding reply for non-oneway invocation ",
+ "with 'topic' destinationStyle");
+ return;
+ }
+ PooledSession replySession = null;
+ try {
+ // setup the reply message
+ replySession = sessionFactory.get();
+ javax.jms.Message request = (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE);
+ String msgType = null;
+ if (request instanceof TextMessage) {
+ msgType = JMSConstants.TEXT_MESSAGE_TYPE;
+ } else if (request instanceof BytesMessage) {
+ msgType = JMSConstants.BYTE_MESSAGE_TYPE;
+ } else {
+ msgType = JMSConstants.BINARY_MESSAGE_TYPE;
+ }
+ javax.jms.Message reply = JMSUtils
+ .createAndSetPayload(replyObj, replySession.session(), msgType);
- public String getBeanName() {
- return endpointInfo.getName().toString() + ".jms-destination";
- }
+ setReplyCorrelationID(request, reply);
+ JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
+ .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS);
+ JMSUtils.setMessageProperties(headers, reply);
+ // ensure that the contentType is set to the out jms message header
+ JMSUtils.setContentToProtocolHeader(outMessage);
+ Map<String, List<String>> protHeaders = CastUtils.cast((Map<?, ?>)outMessage
+ .get(Message.PROTOCOL_HEADERS));
+ JMSUtils.addProtocolHeaders(reply, protHeaders);
+ Destination replyTo = getReplyToDestination(inMessage);
- private void initConfig() {
- this.runtimePolicy = endpointInfo.getTraversedExtensor(new ServerBehaviorPolicyType(),
- ServerBehaviorPolicyType.class);
- this.serverConfig = endpointInfo.getTraversedExtensor(new ServerConfig(), ServerConfig.class);
- this.address = endpointInfo.getTraversedExtensor(new AddressType(), AddressType.class);
- this.sessionPool = endpointInfo.getTraversedExtensor(new SessionPoolType(), SessionPoolType.class);
+ JMSMessageHeadersType inHeaders = (JMSMessageHeadersType)inMessage
+ .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
- Configurer configurer = bus.getExtension(Configurer.class);
- if (null != configurer) {
- configurer.configureBean(this);
+ long timeToLive = 0;
+ if (request.getJMSExpiration() > 0) {
+ TimeZone tz = new SimpleTimeZone(0, "GMT");
+ Calendar cal = new GregorianCalendar(tz);
+ timeToLive = request.getJMSExpiration() - cal.getTimeInMillis();
+ }
+
+ if (timeToLive < 0) {
+ getLogger().log(Level.INFO, "Message time to live is already expired skipping response.");
+ return;
+ }
+
+ int deliveryMode = JMSUtils.getJMSDeliveryMode(inHeaders);
+ int priority = JMSUtils.getJMSPriority(inHeaders);
+ long ttl = JMSUtils.getTimeToLive(headers);
+ if (ttl <= 0) {
+ ttl = getServerConfig().getMessageTimeToLive();
+ }
+ if (ttl <= 0) {
+ ttl = timeToLive;
+ }
+ getLogger().log(Level.FINE, "send out the message!");
+ replySession.producer().send(replyTo, reply, deliveryMode, priority, ttl);
+
+ getLogger().log(Level.FINE, "just server sending reply: ", reply);
+ // Check the reply time limit Stream close will call for this
+ } catch (JMSException ex) {
+ getLogger().log(Level.WARNING, "Failed in post dispatch ...", ex);
+ throw new RuntimeException(ex.getMessage());
+ } catch (NamingException nex) {
+ getLogger().log(Level.WARNING, "Failed in post dispatch ...", nex);
+ throw new RuntimeException(nex.getMessage());
+ } finally {
+ sessionFactory.recycle(replySession);
}
}
+ protected Logger getLogger() {
+ return LOG;
+ }
+
+ public String getBeanName() {
+ return endpointInfo.getName().toString() + ".jms-destination";
+ }
+
public AddressType getJMSAddress() {
return address;
}
@@ -272,100 +367,16 @@
this.sessionPool = sessionPool;
}
- protected class JMSListenerThread extends Thread {
- private final PooledSession listenSession;
- private final QName name;
-
- public JMSListenerThread(PooledSession session, QName n) {
- listenSession = session;
- name = n;
- }
-
- public void run() {
- try {
- Executor executor = null;
- if (executor == null) {
- WorkQueueManager wqm = bus.getExtension(WorkQueueManager.class);
- if (null != wqm) {
- if (name != null) {
- executor = wqm.getNamedWorkQueue("{" + name.getNamespaceURI() + "}"
- + name.getLocalPart());
- }
- if (executor == null) {
- executor = wqm.getNamedWorkQueue("jms");
- }
- if (executor == null) {
- executor = wqm.getAutomaticWorkQueue();
- }
- }
- }
- while (true) {
- javax.jms.Message message = listenSession.consumer().receive();
- if (message == null) {
- getLogger().log(Level.WARNING, "Null message received from message consumer.",
- " Exiting ListenerThread::run().");
- return;
- }
- while (message != null) {
- // REVISIT to get the thread pool
- // Executor executor = jmsDestination.callback.getExecutor();
- if (executor != null) {
- try {
- executor.execute(new JMSExecutor(message));
- message = null;
- } catch (RejectedExecutionException ree) {
- // FIXME - no room left on workqueue, what to do
- // for now, loop until it WILL fit on the queue,
- // although we could just dispatch on this thread.
- }
- } else {
- getLogger().log(Level.INFO, "handle the incoming message in listener thread");
- try {
- incoming(message);
- } catch (IOException ex) {
- getLogger().log(Level.WARNING, "Failed to process incoming message : ", ex);
- }
- }
- message = null;
- }
- }
- } catch (JMSException jmsex) {
- jmsex.printStackTrace();
- getLogger().log(Level.SEVERE, "Exiting ListenerThread::run(): ", jmsex.getMessage());
- } catch (Throwable jmsex) {
- jmsex.printStackTrace();
- getLogger().log(Level.SEVERE, "Exiting ListenerThread::run(): ", jmsex.getMessage());
- }
- }
- }
-
- protected class JMSExecutor implements Runnable {
- javax.jms.Message message;
-
- JMSExecutor(javax.jms.Message m) {
- message = m;
- }
-
- public void run() {
- getLogger().log(Level.INFO, "run the incoming message in the threadpool");
- try {
- incoming(message);
- } catch (IOException ex) {
- // TODO: Decide what to do if we receive the exception.
- getLogger().log(Level.WARNING, "Failed to process incoming message : ", ex);
- }
- }
-
- }
-
// this should deal with the cxf message
protected class BackChannelConduit extends AbstractConduit {
protected Message inMessage;
+ private JMSExchangeSender sender;
- BackChannelConduit(EndpointReferenceType ref, Message message) {
+ BackChannelConduit(JMSExchangeSender sender, EndpointReferenceType ref, Message message) {
super(ref);
inMessage = message;
+ this.sender = sender;
}
/**
@@ -385,14 +396,20 @@
*/
public void prepare(Message message) throws IOException {
// setup the message to be send back
- message.put(JMSConstants.JMS_REQUEST_MESSAGE, inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE));
+ javax.jms.Message jmsMessage = (javax.jms.Message)inMessage
+ .get(JMSConstants.JMS_REQUEST_MESSAGE);
+ message.put(JMSConstants.JMS_REQUEST_MESSAGE, jmsMessage);
if (!message.containsKey(JMSConstants.JMS_SERVER_RESPONSE_HEADERS)
&& inMessage.containsKey(JMSConstants.JMS_SERVER_RESPONSE_HEADERS)) {
message.put(JMSConstants.JMS_SERVER_RESPONSE_HEADERS, inMessage
.get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS));
}
- message.setContent(OutputStream.class, new JMSOutputStream(inMessage, message));
+
+ Exchange exchange = inMessage.getExchange();
+ exchange.setOutMessage(message);
+ message.setContent(OutputStream.class, new JMSOutputStream(sender, exchange,
+ jmsMessage instanceof TextMessage));
}
protected Logger getLogger() {
@@ -400,147 +417,4 @@
}
}
- private class JMSOutputStream extends CachedOutputStream {
-
- private Message inMessage;
- private Message outMessage;
- private javax.jms.Message reply;
- private Queue replyTo;
- private QueueSender sender;
-
- // setup the ByteArrayStream
- public JMSOutputStream(Message m, Message o) {
- super();
- inMessage = m;
- outMessage = o;
- }
-
- // to prepear the message and get the send out message
- private void commitOutputMessage() throws IOException {
-
- JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
- .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS);
- javax.jms.Message request = (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE);
-
- PooledSession replySession = null;
-
- if (JMSUtils.isDestinationStyleQueue(address)) {
- try {
- // setup the reply message
- replyTo = getReplyToDestination(inMessage);
- replySession = sessionFactory.get(false);
- sender = (QueueSender)replySession.producer();
-
- String msgType = JMSConstants.TEXT_MESSAGE_TYPE;
- Object replyObj = null;
-
- if (request instanceof TextMessage) {
- StringBuilder builder = new StringBuilder();
- this.writeCacheTo(builder);
- replyObj = builder.toString();
- msgType = JMSConstants.TEXT_MESSAGE_TYPE;
- } else if (request instanceof BytesMessage) {
- replyObj = getBytes();
- msgType = JMSConstants.BYTE_MESSAGE_TYPE;
- } else {
- replyObj = getBytes();
- msgType = JMSConstants.BINARY_MESSAGE_TYPE;
- }
-
- if (getLogger().isLoggable(Level.FINE)) {
- getLogger().log(
- Level.FINE,
- "The response message is ["
- + (replyObj instanceof String ? (String)replyObj : IOUtils
- .newStringFromBytes((byte[])replyObj)) + "]");
- }
-
- reply = JMSUtils.marshal(replyObj, replySession.session(), null, msgType);
-
- setReplyCorrelationID(request, reply);
- JMSUtils.setMessageProperties(headers, reply);
- // ensure that the contentType is set to the out jms message header
- JMSUtils.setContentToProtocalHeader(outMessage);
- Map<String, List<String>> protHeaders = CastUtils.cast((Map<?, ?>)outMessage
- .get(Message.PROTOCOL_HEADERS));
- JMSUtils.addProtocolHeaders(reply, protHeaders);
-
- sendResponse();
-
- } catch (JMSException ex) {
- getLogger().log(Level.WARNING, "Failed in post dispatch ...", ex);
- throw new IOException(ex.getMessage());
- } catch (NamingException nex) {
- getLogger().log(Level.WARNING, "Failed in post dispatch ...", nex);
- throw new IOException(nex.getMessage());
- } finally {
- // house-keeping
- if (replySession != null) {
- sessionFactory.recycle(replySession);
- }
- }
- } else {
- // we will never receive a non-oneway invocation in pub-sub
- // domain from CXF client - however a mis-behaving pure JMS
- // client could conceivably make suce an invocation, in which
- // case we silently discard the reply
- getLogger().log(Level.WARNING, "discarding reply for non-oneway invocation ",
- "with 'topic' destinationStyle");
-
- }
-
- getLogger().log(Level.FINE, "just server sending reply: ", reply);
- // Check the reply time limit Stream close will call for this
-
- }
-
- private void sendResponse() throws JMSException {
- JMSMessageHeadersType headers = (JMSMessageHeadersType)inMessage
- .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
- javax.jms.Message request = (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE);
-
- int deliveryMode = JMSUtils.getJMSDeliveryMode(headers);
- int priority = JMSUtils.getJMSPriority(headers);
- long ttl = JMSUtils.getTimeToLive(headers);
-
- if (ttl <= 0) {
- ttl = getServerConfig().getMessageTimeToLive();
- }
-
- long timeToLive = 0;
- if (request.getJMSExpiration() > 0) {
- TimeZone tz = new SimpleTimeZone(0, "GMT");
- Calendar cal = new GregorianCalendar(tz);
- timeToLive = request.getJMSExpiration() - cal.getTimeInMillis();
- }
-
- if (timeToLive >= 0) {
- ttl = ttl > 0 ? ttl : timeToLive;
- getLogger().log(Level.FINE, "send out the message!");
- sender.send(replyTo, reply, deliveryMode, priority, ttl);
- } else {
- // the request message had dead
- getLogger().log(Level.INFO, "Message time to live is already expired skipping response.");
- }
- }
-
- @Override
- protected void doFlush() throws IOException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- protected void doClose() throws IOException {
-
- commitOutputMessage();
- }
-
- @Override
- protected void onWrite() throws IOException {
- // Do nothing here
- }
-
- }
-
}
Modified: cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java?rev=692344&r1=692343&r2=692344&view=diff
==============================================================================
--- cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java (original)
+++ cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java Thu Sep 4 23:04:05 2008
@@ -19,24 +19,18 @@
package org.apache.cxf.transport.jms;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Calendar;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Queue;
import javax.jms.QueueConnection;
-import javax.jms.QueueSession;
+import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
-import javax.jms.Topic;
import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
+import javax.jms.TopicConnectionFactory;
import javax.naming.Context;
import javax.naming.NamingException;
@@ -57,7 +51,7 @@
* unidentified producer to send the request message
* <p>
* server-side send: each dispatch of a twoway request requires relatively short-term exclusive use of a
- * session and an indentified producer (but not a consumer) - note that the session used for the recieve side
+ * session and an identified producer (but not a consumer) - note that the session used for the receive side
* cannot be re-used for the send, as MessageListener usage precludes any synchronous sends or receives on
* that session
* <p>
@@ -68,13 +62,13 @@
* strategies make sense ...
* <p>
* client-side: a SoftReference-based cache of send/receive sessions is maintained containing an aggregate of
- * a session, indentified producer, temporary reply destination & consumer for same
+ * a session, identified producer, temporary reply destination & consumer for same
* <p>
* server-side receive: as sessions cannot be usefully recycled, they are simply created on demand and closed
* when no longer required
* <p>
* server-side send: a SoftReference-based cache of send-only sessions is maintained containing an aggregate
- * of a session and an indentified producer
+ * of a session and an identified producer
* <p>
* In a pure client or pure server, only a single cache is ever populated. Where client and server logic is
* co-located, a client session retrieval for a twoway invocation checks the reply-capable cache first and
@@ -93,80 +87,118 @@
private int highWaterMark;
private final Context initialContext;
- private final Connection theConnection;
- private AbstractTwoStageCache<PooledSession> replyCapableSessionCache;
- private AbstractTwoStageCache<PooledSession> sendOnlySessionCache;
- private final Destination theReplyDestination;
- private final ServerBehaviorPolicyType runtimePolicy;
+ private ConnectionFactory connectionFactory;
+ private final Connection connection;
+ private AbstractTwoStageCache<PooledSession> sessionCache;
private boolean destinationIsQueue;
/**
* Constructor.
*
+ * @param connectionFactory
* @param connection the shared {Queue|Topic}Connection
*/
- public JMSSessionFactory(Connection connection, Destination replyDestination, Context context,
- boolean destinationIsQueue, SessionPoolType sessionPoolConfig,
- ServerBehaviorPolicyType runtimePolicy) {
- theConnection = connection;
- theReplyDestination = replyDestination;
+ protected JMSSessionFactory(ConnectionFactory connectionFactory, Connection connection,
+ Destination replyDestination, Context context, boolean destinationIsQueue,
+ SessionPoolType sessionPoolConfig) {
+ this.connectionFactory = connectionFactory;
+ this.connection = connection;
+ this.destinationIsQueue = destinationIsQueue;
initialContext = context;
- this.runtimePolicy = runtimePolicy;
lowWaterMark = sessionPoolConfig.getLowWaterMark();
highWaterMark = sessionPoolConfig.getHighWaterMark();
- this.destinationIsQueue = destinationIsQueue;
// create session caches (REVISIT sizes should be configurable)
- //
-
- if (destinationIsQueue) {
- // the reply capable cache is only required in the point-to-point
- // domain
- //
- replyCapableSessionCache = new AbstractTwoStageCache<PooledSession>(lowWaterMark, highWaterMark,
- 0, this) {
+ try {
+ sessionCache = new AbstractTwoStageCache<PooledSession>(lowWaterMark, highWaterMark, 0, this) {
public final PooledSession create() throws JMSException {
- return createPointToPointReplyCapableSession();
+ return createSession();
}
};
+ sessionCache.populateCache();
+ } catch (Throwable t) {
+ LOG.log(Level.FINE, "JMS Session cache populate failed: " + t);
+ }
+ }
- try {
- replyCapableSessionCache.populateCache();
- } catch (Throwable t) {
- LOG.log(Level.FINE, "JMS Session cache populate failed: " + t);
- }
-
- // send-only cache for point-to-point oneway requests and replies
- //
- sendOnlySessionCache = new AbstractTwoStageCache<PooledSession>(lowWaterMark, highWaterMark, 0,
- this) {
- public final PooledSession create() throws JMSException {
- return createPointToPointSendOnlySession();
- }
- };
+ /**
+ * Helper method to create a point-to-point pooled session.
+ *
+ * @return an appropriate pooled session
+ */
+ private PooledSession createSession() throws JMSException {
+ Session session = null;
+ if (destinationIsQueue) {
+ session = ((QueueConnection)connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ } else {
+ session = ((TopicConnection)connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+ return new PooledSession(session, destinationIsQueue);
+ }
- try {
- sendOnlySessionCache.populateCache();
- } catch (Throwable t) {
- LOG.log(Level.FINE, "JMS Session cache populate failed: " + t);
+ /**
+ * This class acts as the hub of JMS provider usage, creating shared JMS Connections and providing access
+ * to a pool of JMS Sessions.
+ * <p>
+ * A new JMS connection is created for each each port based <jms:address> - however its likely that in
+ * practice the same JMS provider will be specified for each port, and hence the connection resources
+ * could be shared accross ports.
+ * <p>
+ * For the moment this class is realized as just a container for static methods, but the intention is to
+ * support in future sharing of JMS resources accross compatible ports.
+ */
+ protected static JMSSessionFactory connect(AddressType addrDetails, SessionPoolType sessionPoolConfig,
+ ServerConfig serverConfig) throws JMSException,
+ NamingException {
+
+ Context context = JMSUtils.getInitialContext(addrDetails);
+ ConnectionFactory connectionFactory;
+ Connection connection = null;
+
+ if (JMSUtils.isDestinationStyleQueue(addrDetails)) {
+ QueueConnectionFactory qcf = (QueueConnectionFactory)context.lookup(addrDetails
+ .getJndiConnectionFactoryName());
+ if (addrDetails.isSetConnectionUserName()) {
+ connection = qcf.createQueueConnection(addrDetails.getConnectionUserName(), addrDetails
+ .getConnectionPassword());
+ } else {
+ connection = qcf.createQueueConnection();
}
+ connectionFactory = qcf;
} else {
- // send-only cache for pub-sub oneway requests
- //
- sendOnlySessionCache = new AbstractTwoStageCache<PooledSession>(lowWaterMark, highWaterMark, 0,
- this) {
- public final PooledSession create() throws JMSException {
- return createPubSubSession(true, false, null);
- }
- };
+ TopicConnectionFactory tcf = (TopicConnectionFactory)context.lookup(addrDetails
+ .getJndiConnectionFactoryName());
+ if (addrDetails.isSetConnectionUserName()) {
+ connection = tcf.createTopicConnection(addrDetails.getConnectionUserName(), addrDetails
+ .getConnectionPassword());
+ } else {
+ connection = tcf.createTopicConnection();
+ }
+ connectionFactory = tcf;
+ }
- try {
- sendOnlySessionCache.populateCache();
- } catch (Throwable t) {
- LOG.log(Level.FINE, "JMS Session cache populate failed: " + t);
+ if (null != serverConfig) {
+ String clientID = serverConfig.getDurableSubscriptionClientId();
+
+ if (clientID != null) {
+ connection.setClientID(clientID);
}
}
+ connection.start();
+ /*
+ * Destination requestDestination = resolveRequestDestination(context, connection, addrDetails);
+ */
+
+ Destination replyDestination = JMSUtils.resolveReplyDestination(context, connection, addrDetails);
+
+ // create session factory to manage session, reply destination,
+ // producer and consumer pooling
+ //
+ JMSSessionFactory sf = new JMSSessionFactory(connectionFactory, connection, replyDestination,
+ context, JMSUtils.isDestinationStyleQueue(addrDetails),
+ sessionPoolConfig);
+ return sf;
}
// --java.lang.Object Overrides----------------------------------------------
@@ -175,100 +207,35 @@
}
// --Methods-----------------------------------------------------------------
- protected Connection getConnection() {
- return theConnection;
- }
- public Queue getQueueFromInitialContext(String queueName) throws NamingException {
- return (Queue)initialContext.lookup(queueName);
+ public ConnectionFactory getConnectionFactory() {
+ return connectionFactory;
+ }
+
+ protected Connection getConnection() {
+ return connection;
}
- public PooledSession get(boolean replyCapable) throws JMSException {
- return get(null, replyCapable);
+ public Context getInitialContext() {
+ return initialContext;
}
/**
* Retrieve a new or cached Session.
*
- * @param replyDest Destination name if coming from wsa:Header
- * @param replyCapable true iff the session is to be used to receive replies (implies client side twoway
- * invocation )
* @return a new or cached Session
*/
- public PooledSession get(Destination replyDest, boolean replyCapable) throws JMSException {
+ public PooledSession get() {
PooledSession ret = null;
synchronized (this) {
- if (replyCapable) {
- // first try reply capable cache
- //
- ret = replyCapableSessionCache.poll();
-
- if (ret == null) {
- // fall back to send only cache, creating temporary reply
- // queue and consumer
- //
- ret = sendOnlySessionCache.poll();
-
- if (ret != null) {
- QueueSession session = (QueueSession)ret.session();
- Queue destination = null;
- String selector = null;
-
- if (null != theReplyDestination || null != replyDest) {
- destination = null != replyDest ? (Queue)replyDest : (Queue)theReplyDestination;
-
- selector = "JMSCorrelationID = '" + generateUniqueSelector(ret) + "'";
- }
-
- if (destination == null) {
- // neither replyDestination not replyDest are present.
- destination = session.createTemporaryQueue();
- selector = "JMSCorrelationID = '" + generateUniqueSelector(ret) + "'";
- }
-
- ret.destination(destination);
- MessageConsumer consumer = session.createReceiver(destination, selector);
- ret.consumer(consumer);
- } else {
- // no pooled session available in either cache => create one in
- // in the reply capable cache
- //
- try {
- ret = replyCapableSessionCache.get();
- } catch (Throwable t) {
- // factory method may only throw JMSException
- //
- throw (JMSException)t;
- }
- }
- }
- } else {
- // first try send only cache
- //
- ret = sendOnlySessionCache.poll();
-
- if (ret == null) {
- // fall back to reply capable cache if one exists (only in the
- // point-to-point domain), ignoring temporary reply destination
- // and consumer
- //
- if (replyCapableSessionCache != null) {
- ret = replyCapableSessionCache.poll();
- }
-
- if (ret == null) {
- // no pooled session available in either cache => create one in
- // in the send only cache
- //
- try {
- ret = sendOnlySessionCache.get();
- } catch (Throwable t) {
- // factory method may only throw JMSException
- //
- throw (JMSException)t;
- }
- }
+ ret = sessionCache.poll();
+
+ if (ret == null) {
+ try {
+ ret = sessionCache.get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
}
@@ -277,27 +244,6 @@
}
/**
- * Retrieve a new
- *
- * @param destination the target JMS queue or topic (non-null implies server receive side)
- * @return a new or cached Session
- */
- public PooledSession get(Destination destination) throws JMSException {
- PooledSession ret = null;
-
- // the destination is only specified on the server receive side,
- // in which case a new session is always created
- //
- if (destinationIsQueue) {
- ret = createPointToPointServerSession(destination);
- } else {
- ret = createPubSubSession(false, true, destination);
- }
-
- return ret;
- }
-
- /**
* Return a Session to the pool
*
* @param pooled_session the session to recycle
@@ -305,15 +251,14 @@
public void recycle(PooledSession pooledSession) {
// sessions used long-term by the server receive side are not cached,
// only non-null destinations are temp queues
- final boolean replyCapable = pooledSession.destination() != null;
+ if (pooledSession == null) {
+ return;
+ }
boolean discard = false;
+ // re-cache session, closing if it cannot be it can be accomodated
synchronized (this) {
- // re-cache session, closing if it cannot be it can be accomodated
- //
- discard = replyCapable
- ? (!replyCapableSessionCache.recycle(pooledSession)) : (!sendOnlySessionCache
- .recycle(pooledSession));
+ discard = !sessionCache.recycle(pooledSession);
}
if (discard) {
@@ -332,124 +277,21 @@
try {
PooledSession curr;
- if (replyCapableSessionCache != null) {
- curr = replyCapableSessionCache.poll();
- while (curr != null) {
- curr.close();
- curr = replyCapableSessionCache.poll();
- }
- }
-
- if (sendOnlySessionCache != null) {
- curr = sendOnlySessionCache.poll();
+ if (sessionCache != null) {
+ curr = sessionCache.poll();
while (curr != null) {
curr.close();
- curr = sendOnlySessionCache.poll();
+ curr = sessionCache.poll();
}
}
- theConnection.close();
+ connection.close();
} catch (JMSException e) {
LOG.log(Level.WARNING, "queue connection close failed: " + e);
}
// help GC
//
- replyCapableSessionCache = null;
- sendOnlySessionCache = null;
- }
-
- /**
- * Helper method to create a point-to-point pooled session.
- *
- * @param producer true iff producing
- * @param consumer true iff consuming
- * @param destination the target destination
- * @return an appropriate pooled session
- */
- PooledSession createPointToPointReplyCapableSession() throws JMSException {
- QueueSession session = ((QueueConnection)theConnection).createQueueSession(false,
- Session.AUTO_ACKNOWLEDGE);
- Destination destination = null;
- String selector = null;
-
- if (null != theReplyDestination) {
- destination = theReplyDestination;
-
- selector = "JMSCorrelationID = '" + generateUniqueSelector(session) + "'";
-
- } else {
- destination = session.createTemporaryQueue();
- }
-
- MessageConsumer consumer = session.createReceiver((Queue)destination, selector);
- return new PooledSession(session, destination, session.createSender(null), consumer);
- }
-
- /**
- * Helper method to create a point-to-point pooled session.
- *
- * @return an appropriate pooled session
- */
- PooledSession createPointToPointSendOnlySession() throws JMSException {
- QueueSession session = ((QueueConnection)theConnection).createQueueSession(false,
- Session.AUTO_ACKNOWLEDGE);
-
- return new PooledSession(session, null, session.createSender(null), null);
- }
-
- /**
- * Helper method to create a point-to-point pooled session for consumer only.
- *
- * @param destination the target destination
- * @return an appropriate pooled session
- */
- private PooledSession createPointToPointServerSession(Destination destination) throws JMSException {
- QueueSession session = ((QueueConnection)theConnection).createQueueSession(false,
- Session.AUTO_ACKNOWLEDGE);
-
- return new PooledSession(session, destination, session.createSender(null), session
- .createReceiver((Queue)destination, runtimePolicy.getMessageSelector()));
- }
-
- /**
- * Helper method to create a pub-sub pooled session.
- *
- * @param producer true iff producing
- * @param consumer true iff consuming
- * @param destination the target destination
- * @return an appropriate pooled session
- */
- PooledSession createPubSubSession(boolean producer, boolean consumer, Destination destination)
- throws JMSException {
- TopicSession session = ((TopicConnection)theConnection).createTopicSession(false,
- Session.AUTO_ACKNOWLEDGE);
- TopicSubscriber sub = null;
- if (consumer) {
- String messageSelector = runtimePolicy.getMessageSelector();
- String durableName = runtimePolicy.getDurableSubscriberName();
- if (durableName != null) {
- sub = session
- .createDurableSubscriber((Topic)destination, durableName, messageSelector, false);
- } else {
- sub = session.createSubscriber((Topic)destination, messageSelector, false);
- }
- }
-
- return new PooledSession(session, null, producer ? session.createPublisher(null) : null, sub);
- }
-
- private String generateUniqueSelector(Object obj) {
- String host = "localhost";
-
- try {
- InetAddress addr = InetAddress.getLocalHost();
- host = addr.getHostName();
- } catch (UnknownHostException ukex) {
- // Default to localhost.
- }
-
- long time = Calendar.getInstance().getTimeInMillis();
- return host + "_" + System.getProperty("user.name") + "_" + obj + time;
+ sessionCache = null;
}
}
Modified: cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java?rev=692344&r1=692343&r2=692344&view=diff
==============================================================================
--- cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java (original)
+++ cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java Thu Sep 4 23:04:05 2008
@@ -30,12 +30,18 @@
import java.util.logging.Logger;
import javax.jms.BytesMessage;
+import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
+import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
@@ -136,7 +142,7 @@
* @param replyTo the ReplyTo destination if any
* @return a JMS of the appropriate type populated with the given payload
*/
- public static Message marshal(Object payload, Session session, Destination replyTo, String messageType)
+ public static Message createAndSetPayload(Object payload, Session session, String messageType)
throws JMSException {
Message message = null;
@@ -150,32 +156,26 @@
((ObjectMessage)message).setObject((byte[])payload);
}
- if (replyTo != null) {
- message.setJMSReplyTo(replyTo);
- }
-
return message;
}
/**
- * Unmarshal the payload of an incoming message.
+ * Extract the payload of an incoming message.
*
* @param message the incoming message
- * @return the unmarshalled message payload, either of type String or byte[] depending on payload type
+ * @return the message payload as byte[]
*/
- public static Object unmarshal(Message message) throws JMSException {
- Object ret = null;
+ public static byte[] retrievePayload(Message message) throws JMSException {
+ byte[] ret = null;
if (message instanceof TextMessage) {
- ret = ((TextMessage)message).getText();
+ ret = ((TextMessage)message).getText().getBytes();
} else if (message instanceof BytesMessage) {
- byte[] retBytes = new byte[(int)((BytesMessage)message).getBodyLength()];
- ((BytesMessage)message).readBytes(retBytes);
- ret = retBytes;
+ ret = new byte[(int)((BytesMessage)message).getBodyLength()];
+ ((BytesMessage)message).readBytes(ret);
} else {
ret = (byte[])((ObjectMessage)message).getObject();
}
-
return ret;
}
@@ -251,7 +251,7 @@
return headers;
}
- public static void setContentToProtocalHeader(org.apache.cxf.message.Message message) {
+ public static void setContentToProtocolHeader(org.apache.cxf.message.Message message) {
String contentType = (String)message.get(org.apache.cxf.message.Message.CONTENT_TYPE);
Map<String, List<String>> headers = JMSUtils.getSetProtocolHeaders(message);
@@ -268,4 +268,107 @@
public static boolean isDestinationStyleQueue(AddressType address) {
return JMSConstants.JMS_QUEUE.equals(address.getDestinationStyle().value());
}
+
+ public static Message buildJMSMessageFromCXFMessage(org.apache.cxf.message.Message outMessage,
+ Object payload, String messageType, Session session,
+ Destination replyTo, String correlationId)
+ throws JMSException {
+ Message jmsMessage = JMSUtils.createAndSetPayload(payload, session, messageType);
+
+ if (replyTo != null) {
+ jmsMessage.setJMSReplyTo(replyTo);
+ }
+
+ JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
+ .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
+
+ String correlationID = JMSUtils.getCorrelationId(headers);
+
+ JMSUtils.setMessageProperties(headers, jmsMessage);
+ // ensure that the contentType is set to the out jms message header
+ JMSUtils.setContentToProtocolHeader(outMessage);
+ Map<String, List<String>> protHeaders = CastUtils.cast((Map<?, ?>)outMessage
+ .get(org.apache.cxf.message.Message.PROTOCOL_HEADERS));
+ JMSUtils.addProtocolHeaders(jmsMessage, protHeaders);
+ if (!outMessage.getExchange().isOneWay()) {
+ String id = correlationId;
+
+ if (id != null) {
+ if (correlationID != null) {
+ String error = "User cannot set JMSCorrelationID when "
+ + "making a request/reply invocation using " + "a static replyTo Queue.";
+ throw new JMSException(error);
+ }
+ correlationID = id;
+ }
+ }
+
+ if (correlationID != null) {
+ jmsMessage.setJMSCorrelationID(correlationID);
+ } else {
+ // No message correlation id is set. Whatever comeback will be accepted as responses.
+ // We assume that it will only happen in case of the temp. reply queue.
+ }
+ return jmsMessage;
+ }
+
+ public static void sendMessage(MessageProducer producer, Destination destination, Message jmsMessage,
+ long timeToLive, int deliveryMode, int priority) throws JMSException {
+ /*
+ * Can this be changed to producer.send(destination, jmsMessage, deliveryMode, priority, timeToLive);
+ */
+
+ if (destination instanceof Queue) {
+ QueueSender sender = (QueueSender)producer;
+ sender.setTimeToLive(timeToLive);
+ sender.send((Queue)destination, jmsMessage, deliveryMode, priority, timeToLive);
+ } else {
+ TopicPublisher publisher = (TopicPublisher)producer;
+ publisher.setTimeToLive(timeToLive);
+ publisher.publish((Topic)destination, jmsMessage, deliveryMode, priority, timeToLive);
+ }
+ }
+
+ public static Destination resolveRequestDestination(Context context, Connection connection,
+ AddressType addrDetails) throws JMSException,
+ NamingException {
+ Destination requestDestination = null;
+ // see if jndiDestination is set
+ if (addrDetails.getJndiDestinationName() != null) {
+ requestDestination = (Destination)context.lookup(addrDetails.getJndiDestinationName());
+ }
+
+ // if no jndiDestination or it fails see if jmsDestination is set
+ // and try to create it.
+ if (requestDestination == null && addrDetails.getJmsDestinationName() != null) {
+ if (JMSUtils.isDestinationStyleQueue(addrDetails)) {
+ requestDestination = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ .createQueue(addrDetails.getJmsDestinationName());
+ } else {
+ requestDestination = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ .createTopic(addrDetails.getJmsDestinationName());
+ }
+ }
+ return requestDestination;
+ }
+
+ public static Queue resolveReplyDestination(Context context, Connection connection,
+ AddressType addrDetails) throws NamingException,
+ JMSException {
+ Queue replyDestination = null;
+
+ // Reply Destination is used (if present) only if the session is
+ // point-to-point session
+ if (JMSUtils.isDestinationStyleQueue(addrDetails)) {
+ if (addrDetails.getJndiReplyDestinationName() != null) {
+ replyDestination = (Queue)context.lookup(addrDetails.getJndiReplyDestinationName());
+ }
+ if (replyDestination == null && addrDetails.getJmsReplyDestinationName() != null) {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ replyDestination = session.createQueue(addrDetails.getJmsReplyDestinationName());
+ session.close();
+ }
+ }
+ return replyDestination;
+ }
}