You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ni...@apache.org on 2008/09/05 08:04:05 UTC

svn commit: r692344 [1/2] - in /cxf/branches/2.1.x-fixes: ./ rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/

Author: ningjiang
Date: Thu Sep  4 23:04:05 2008
New Revision: 692344

URL: http://svn.apache.org/viewvc?rev=692344&view=rev
Log:
Merged revisions 692329 via svnmerge from 
https://svn.apache.org/repos/asf/cxf/trunk

........
  r692329 | ningjiang | 2008-09-05 11:38:07 +0800 (Fri, 05 Sep 2008) | 1 line
  
  CXF-1783 applied patch with thanks to Christian, als fixed some typos on the comments
........

Added:
    cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSExchangeSender.java
      - copied unchanged from r692329, cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSExchangeSender.java
    cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSListenerThread.java
      - copied unchanged from r692329, cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSListenerThread.java
    cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOutputStream.java
      - copied unchanged from r692329, cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOutputStream.java
Removed:
    cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOnConnectCallback.java
    cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSProviderHub.java
Modified:
    cxf/branches/2.1.x-fixes/   (props changed)
    cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
    cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
    cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java
    cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java
    cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/PooledSession.java
    cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
    cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
    cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
    cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledSessionTest.java

Propchange: cxf/branches/2.1.x-fixes/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java?rev=692344&r1=692343&r2=692344&view=diff
==============================================================================
--- cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java (original)
+++ cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java Thu Sep  4 23:04:05 2008
@@ -23,42 +23,39 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.List;
-import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
 import javax.jms.Queue;
-import javax.jms.QueueSender;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.naming.NamingException;
+import javax.naming.Context;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.configuration.Configurable;
 import org.apache.cxf.configuration.Configurer;
-import org.apache.cxf.helpers.CastUtils;
-import org.apache.cxf.io.CachedOutputStream;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.AbstractConduit;
-import org.apache.cxf.transport.Conduit;
-import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 
-public class JMSConduit extends AbstractConduit implements Configurable, JMSOnConnectCallback {
+/**
+ * JMSConduit is instantiated by the JMSTransportfactory which is selected by a client if the transport
+ * protocol starts with jms:// JMSConduit converts CXF Messages to JMS Messages and sends the request by using
+ * JMS topics or queues. If the Exchange is not oneway it then recevies the response and converts it to a CXF
+ * Message. This is then provided in the Exchange and also sent to the incomingObserver
+ */
+public class JMSConduit extends AbstractConduit implements Configurable, JMSExchangeSender {
 
     protected static final String BASE_BEAN_NAME_SUFFIX = ".jms-conduit-base";
 
-    private static final Logger LOG = LogUtils.getL7dLogger(JMSConduit.class);
+    static final Logger LOG = LogUtils.getL7dLogger(JMSConduit.class);
 
     protected Destination targetDestination;
-    protected Destination replyDestination;
     protected JMSSessionFactory sessionFactory;
     protected Bus bus;
     protected EndpointInfo endpointInfo;
@@ -69,136 +66,191 @@
     protected AddressType address;
     protected SessionPoolType sessionPool;
 
+    private Queue replyDestination;
+
+    private Context context;
+
     public JMSConduit(Bus b, EndpointInfo endpointInfo) {
         this(b, endpointInfo, null);
     }
 
     public JMSConduit(Bus b, EndpointInfo endpointInfo, EndpointReferenceType target) {
         super(target);
-
         this.bus = b;
         this.endpointInfo = endpointInfo;
         this.beanNameSuffix = BASE_BEAN_NAME_SUFFIX;
         initConfig();
     }
 
-    // prepare the message for send out , not actually send out the message
-    public void prepare(Message message) throws IOException {
-        getLogger().log(Level.FINE, "JMSConduit send message");
-
-        try {
-            if (null == sessionFactory) {
-                JMSProviderHub.connect(this, getJMSAddress(), getSessionPool());
-            }
-        } catch (JMSException jmsex) {
-            getLogger().log(Level.WARNING, "JMS connect failed with JMSException : ", jmsex);
-            throw new IOException(jmsex.toString());
-        } catch (NamingException ne) {
-            getLogger().log(Level.WARNING, "JMS connect failed with NamingException : ", ne);
-            throw new IOException(ne.toString());
-        }
+    private void initConfig() {
+        this.address = endpointInfo.getTraversedExtensor(new AddressType(), AddressType.class);
+        this.sessionPool = endpointInfo.getTraversedExtensor(new SessionPoolType(), SessionPoolType.class);
+        this.clientConfig = endpointInfo.getTraversedExtensor(new ClientConfig(), ClientConfig.class);
+        this.runtimePolicy = endpointInfo.getTraversedExtensor(new ClientBehaviorPolicyType(),
+                                                               ClientBehaviorPolicyType.class);
 
-        if (sessionFactory == null) {
-            throw new java.lang.IllegalStateException("JMSClientTransport not connected");
+        Configurer configurer = bus.getExtension(Configurer.class);
+        if (null != configurer) {
+            configurer.configureBean(this);
         }
+    }
 
-        try {
-            boolean isOneWay = false;
-            // test if the message is oneway message
-            Exchange ex = message.getExchange();
-            if (null != ex) {
-                isOneWay = ex.isOneWay();
+    public JMSSessionFactory getOrCreateSessionFactory() {
+        if (this.sessionFactory == null) {
+            try {
+                this.context = JMSUtils.getInitialContext(address);
+                this.sessionFactory = JMSSessionFactory
+                    .connect(getJMSAddress(), getSessionPool(), null);
+                this.targetDestination = JMSUtils.resolveRequestDestination(sessionFactory
+                    .getInitialContext(), sessionFactory.getConnection(), address);
+                this.replyDestination = JMSUtils.resolveReplyDestination(context, sessionFactory
+                    .getConnection(), address);
+            } catch (Exception jmsex) {
+                throw new RuntimeException("JMS connect failed: ", jmsex);
             }
-            // get the pooledSession with response expected
-            PooledSession pooledSession = sessionFactory.get(!isOneWay);
-            // put the PooledSession into the outMessage
-            message.put(JMSConstants.JMS_POOLEDSESSION, pooledSession);
-
-        } catch (JMSException jmsex) {
-            throw new IOException(jmsex.getMessage());
         }
-
-        message.setContent(OutputStream.class, new JMSOutputStream(message));
-
-    }
-
-    public void close() {
-        getLogger().log(Level.FINE, "JMSConduit closed ");
-
-        // ensure resources held by session factory are released
-        //
-        if (sessionFactory != null) {
-            sessionFactory.shutdown();
+        if (this.targetDestination == null) {
+            throw new RuntimeException("Failed to lookup or create requestDestination");
         }
+        return this.sessionFactory;
     }
 
-    protected Logger getLogger() {
-        return LOG;
+    // prepare the message for send out , not actually send out the message
+    public void prepare(Message message) throws IOException {
+        if (this.address == null || this.address.getJndiConnectionFactoryName() == null) {
+            throw new RuntimeException("Insufficient configuration for Conduit. "
+                + "Did you configure a <jms:conduit name=\"" 
+                + getBeanName() + "\"> and set the jndiConnectionFactoryName ?");
+        }
+        message.setContent(OutputStream.class, new JMSOutputStream(this, 
+            message.getExchange(), isTextPayload()));
+        // After this step flow will continue in JMSOutputStream.doClose()
     }
 
     /**
-     * Receive mechanics.
+     * Send the JMS Request out and if not oneWay receive the response
      * 
-     * @param pooledSession the shared JMS resources
-     * @param inMessage
-     * @retrun the response buffer
+     * @param outMessage
+     * @param request
+     * @return inMessage
      */
-    private Object receive(PooledSession pooledSession, Message outMessage, Message inMessage)
-        throws JMSException {
-
-        Object result = null;
+    public void sendExchange(Exchange exchange, Object request) {
+        LOG.log(Level.FINE, "JMSConduit send message");
 
-        long timeout = getClientConfig().getClientReceiveTimeout();
+        sessionFactory = getOrCreateSessionFactory();
+        PooledSession pooledSession = null;
+        try {
+            pooledSession = sessionFactory.get();
+            Destination replyTo = null;
+            if (!exchange.isOneWay()) {
+                pooledSession.initConsumerAndReplyDestination(replyDestination);
+                replyTo = pooledSession.getReplyDestination();
+            }
 
-        Long receiveTimeout = (Long)outMessage.get(JMSConstants.JMS_CLIENT_RECEIVE_TIMEOUT);
+            // TODO setting up the responseExpected
 
-        if (receiveTimeout != null) {
-            timeout = receiveTimeout.longValue();
+            // We don't want to send temp queue in
+            // replyTo header for oneway calls
+            if (exchange.isOneWay() && (getJMSAddress().getJndiReplyDestinationName() == null)) {
+                replyTo = null;
+            }
+            Message outMessage = exchange.getOutMessage();
+            if (outMessage == null) {
+                throw new RuntimeException("Exchange to be sent has no outMessage");
+            }
+            sendMessage(outMessage, request, pooledSession, replyTo);
+
+            if (!exchange.isOneWay()) {
+                long receiveTimeout = clientConfig.getClientReceiveTimeout();
+                Long messageReceiveTimeout = (Long)exchange.getOutMessage()
+                    .get(JMSConstants.JMS_CLIENT_RECEIVE_TIMEOUT);
+                if (messageReceiveTimeout != null) {
+                    receiveTimeout = messageReceiveTimeout.longValue();
+                }
+                Message inMessage = receiveResponse(pooledSession.consumer(), receiveTimeout);
+                exchange.setInMessage(inMessage);
+                incomingObserver.onMessage(inMessage);
+            }
+        } finally {
+            sessionFactory.recycle(pooledSession);
         }
+    }
 
-        javax.jms.Message jmsMessage = pooledSession.consumer().receive(timeout);
-        getLogger().log(Level.FINE, "client received reply: ", jmsMessage);
+    private void sendMessage(Message outMessage, Object request, PooledSession pooledSession,
+                             Destination replyTo) {
+        try {
+            String messageType = runtimePolicy.getMessageType().value();
+            javax.jms.Message jmsMessage;
+            jmsMessage = JMSUtils.buildJMSMessageFromCXFMessage(outMessage, request, messageType,
+                                                                pooledSession.session(), replyTo,
+                                                                pooledSession.getCorrelationID());
 
-        if (jmsMessage != null) {
+            // Retrieve JMS QoS parameters from CXF message headers
+            JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
+                .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
+            long ttl = JMSUtils.getTimeToLive(headers);
+            if (ttl <= 0) {
+                ttl = clientConfig.getMessageTimeToLive();
+            }
+            int deliveryMode = JMSUtils.getJMSDeliveryMode(headers);
+            int priority = JMSUtils.getJMSPriority(headers);
+
+            LOG.log(Level.FINE, "client sending request: ", jmsMessage);
+            JMSUtils.sendMessage(pooledSession.producer(), targetDestination, jmsMessage, ttl, deliveryMode,
+                                 priority);
+        } catch (JMSException e) {
+            throw new RuntimeException("Problem while sending JMS message", e);
+        }
+    }
+
+    private Message receiveResponse(MessageConsumer consumer, long receiveTimeout) {
+        // TODO if outMessage need to get the response
+        try {
+            Message inMessage = new MessageImpl();
+            // set the message header back to the incomeMessage
+            // inMessage.put(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS,
+            // outMessage.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS));
 
+            byte[] response = null;
+            javax.jms.Message jmsMessage = consumer.receive(receiveTimeout);
+            if (jmsMessage == null) {
+                // TODO: Review what exception should we throw.
+                throw new JMSException("JMS receive timed out");
+            }
+            LOG.log(Level.FINE, "client received reply: ", jmsMessage);
             JMSUtils.populateIncomingContext(jmsMessage, inMessage, JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
-            result = JMSUtils.unmarshal(jmsMessage);
-            return result;
-        } else {
-            String error = "JMSClientTransport.receive() timed out. No message available.";
-            getLogger().log(Level.SEVERE, error);
-            // TODO: Review what exception should we throw.
-            throw new JMSException(error);
+            response = JMSUtils.retrievePayload(jmsMessage);
+            LOG.log(Level.FINE, "The Response Message payload is : [" + response + "]");
+
+            // setup the inMessage response stream
+            inMessage.setContent(InputStream.class, new ByteArrayInputStream(response));
+            LOG.log(Level.FINE, "incoming observer is " + incomingObserver);
 
+            return inMessage;
+        } catch (JMSException e) {
+            throw new RuntimeException("Problem while receiving JMS message", e);
         }
-    }
 
-    public void connected(Destination target, Destination reply, JMSSessionFactory factory) {
-        this.targetDestination = target;
-        this.replyDestination = reply;
-        this.sessionFactory = factory;
     }
 
-    public String getBeanName() {
-        return endpointInfo.getName().toString() + ".jms-conduit";
+    private boolean isTextPayload() {
+        return JMSConstants.TEXT_MESSAGE_TYPE.equals(runtimePolicy.getMessageType().value());
     }
 
-    private void initConfig() {
-
-        this.address = endpointInfo.getTraversedExtensor(new AddressType(), AddressType.class);
-        this.sessionPool = endpointInfo.getTraversedExtensor(new SessionPoolType(), SessionPoolType.class);
-        this.clientConfig = endpointInfo.getTraversedExtensor(new ClientConfig(), ClientConfig.class);
-        this.runtimePolicy = endpointInfo.getTraversedExtensor(new ClientBehaviorPolicyType(),
-                                                               ClientBehaviorPolicyType.class);
-
-        Configurer configurer = bus.getExtension(Configurer.class);
-        if (null != configurer) {
-            configurer.configureBean(this);
+    public void close() {
+        getLogger().log(Level.FINE, "JMSConduit closed ");
+        // ensure resources held by session factory are released
+        if (sessionFactory != null) {
+            sessionFactory.shutdown();
         }
     }
 
-    private boolean isTextPayload() {
-        return JMSConstants.TEXT_MESSAGE_TYPE.equals(getRuntimePolicy().getMessageType().value());
+    protected Logger getLogger() {
+        return LOG;
+    }
+
+    public String getBeanName() {
+        return endpointInfo.getName().toString() + ".jms-conduit";
     }
 
     public AddressType getJMSAddress() {
@@ -232,190 +284,4 @@
     public void setSessionPool(SessionPoolType sessionPool) {
         this.sessionPool = sessionPool;
     }
-
-    private class JMSOutputStream extends CachedOutputStream {
-        private Message outMessage;
-        private javax.jms.Message jmsMessage;
-        private PooledSession pooledSession;
-        private boolean isOneWay;
-
-        public JMSOutputStream(Message m) {
-            outMessage = m;
-            pooledSession = (PooledSession)outMessage.get(JMSConstants.JMS_POOLEDSESSION);
-        }
-
-        protected void doFlush() throws IOException {
-            // do nothing here
-        }
-
-        protected void doClose() throws IOException {
-            try {
-                isOneWay = outMessage.getExchange().isOneWay();
-                commitOutputMessage();
-                if (!isOneWay) {
-                    handleResponse();
-                }
-            } catch (JMSException jmsex) {
-                getLogger().log(Level.WARNING, "JMS connect failed with JMSException : ", jmsex);
-                throw new IOException(jmsex.toString());
-            } finally {
-                sessionFactory.recycle(pooledSession);
-            }
-        }
-
-        protected void onWrite() throws IOException {
-
-        }
-
-        private void commitOutputMessage() throws JMSException {
-            javax.jms.Destination replyTo = pooledSession.destination();
-            // TODO setting up the responseExpected
-
-            // We don't want to send temp queue in
-            // replyTo header for oneway calls
-            if (isOneWay && (getJMSAddress().getJndiReplyDestinationName() == null)) {
-                replyTo = null;
-            }
-
-            Object request = null;
-            try {
-                if (isTextPayload()) {
-                    StringBuilder builder = new StringBuilder(2048);
-                    this.writeCacheTo(builder);
-                    request = builder.toString();
-                } else {
-                    request = getBytes();
-                }
-            } catch (IOException ex) {
-                JMSException ex2 = new JMSException("Error creating request");
-                ex2.setLinkedException(ex);
-                throw ex2;
-            }
-            if (getLogger().isLoggable(Level.FINE)) {
-                getLogger().log(Level.FINE, "Conduit Request is :[" + request + "]");
-            }
-
-            jmsMessage = JMSUtils.marshal(request, pooledSession.session(), replyTo, getRuntimePolicy()
-                .getMessageType().value());
-
-            JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
-                .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
-
-            int deliveryMode = JMSUtils.getJMSDeliveryMode(headers);
-            int priority = JMSUtils.getJMSPriority(headers);
-            String correlationID = JMSUtils.getCorrelationId(headers);
-            long ttl = JMSUtils.getTimeToLive(headers);
-            if (ttl <= 0) {
-                ttl = getClientConfig().getMessageTimeToLive();
-            }
-
-            JMSUtils.setMessageProperties(headers, jmsMessage);
-            // ensure that the contentType is set to the out jms message header
-            JMSUtils.setContentToProtocalHeader(outMessage);
-            Map<String, List<String>> protHeaders = CastUtils.cast((Map<?, ?>)outMessage
-                .get(Message.PROTOCOL_HEADERS));
-            JMSUtils.addProtocolHeaders(jmsMessage, protHeaders);
-            if (!isOneWay) {
-                String id = pooledSession.getCorrelationID();
-
-                if (id != null) {
-                    if (correlationID != null) {
-                        String error = "User cannot set JMSCorrelationID when "
-                                       + "making a request/reply invocation using "
-                                       + "a static replyTo Queue.";
-                        throw new JMSException(error);
-                    }
-                    correlationID = id;
-                }
-            }
-
-            if (correlationID != null) {
-                jmsMessage.setJMSCorrelationID(correlationID);
-            } else {
-                // No message correlation id is set. Whatever comeback will be accepted as responses.
-                // We assume that it will only happen in case of the temp. reply queue.
-            }
-
-            getLogger().log(Level.FINE, "client sending request: ", jmsMessage);
-            // getting Destination Style
-            if (JMSUtils.isDestinationStyleQueue(address)) {
-                QueueSender sender = (QueueSender)pooledSession.producer();
-                sender.setTimeToLive(ttl);
-                sender.send((Queue)targetDestination, jmsMessage, deliveryMode, priority, ttl);
-            } else {
-                TopicPublisher publisher = (TopicPublisher)pooledSession.producer();
-                publisher.setTimeToLive(ttl);
-                publisher.publish((Topic)targetDestination, jmsMessage, deliveryMode, priority, ttl);
-            }
-        }
-
-        private void handleResponse() throws IOException {
-            // REVISIT distinguish decoupled case or oneway call
-            Object response = null;
-
-            // TODO if outMessage need to get the response
-            Message inMessage = new MessageImpl();
-            outMessage.getExchange().setInMessage(inMessage);
-            // set the message header back to the incomeMessage
-            // inMessage.put(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS,
-            // outMessage.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS));
-
-            try {
-                response = receive(pooledSession, outMessage, inMessage);
-            } catch (JMSException jmsex) {
-                getLogger().log(Level.FINE, "JMS connect failed with JMSException : ", jmsex);
-                throw new IOException(jmsex.toString());
-            }
-
-            getLogger().log(Level.FINE, "The Response Message is : [" + response + "]");
-
-            // setup the inMessage response stream
-            byte[] bytes = null;
-            if (response instanceof String) {
-                String requestString = (String)response;
-                bytes = requestString.getBytes();
-            } else {
-                bytes = (byte[])response;
-            }
-            inMessage.setContent(InputStream.class, new ByteArrayInputStream(bytes));
-            getLogger().log(Level.FINE, "incoming observer is " + incomingObserver);
-            incomingObserver.onMessage(inMessage);
-        }
-    }
-
-    /**
-     * Represented decoupled response endpoint.
-     */
-    protected class DecoupledDestination implements Destination {
-        protected MessageObserver decoupledMessageObserver;
-        private EndpointReferenceType address;
-
-        DecoupledDestination(EndpointReferenceType ref, MessageObserver incomingObserver) {
-            address = ref;
-            decoupledMessageObserver = incomingObserver;
-        }
-
-        public EndpointReferenceType getAddress() {
-            return address;
-        }
-
-        public Conduit getBackChannel(Message inMessage, Message partialResponse, EndpointReferenceType addr)
-            throws IOException {
-            // shouldn't be called on decoupled endpoint
-            return null;
-        }
-
-        public void shutdown() {
-            // TODO Auto-generated method stub
-        }
-
-        public synchronized void setMessageObserver(MessageObserver observer) {
-            decoupledMessageObserver = observer;
-        }
-
-        public synchronized MessageObserver getMessageObserver() {
-            return decoupledMessageObserver;
-        }
-    }
-
 }

Modified: cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=692344&r1=692343&r2=692344&view=diff
==============================================================================
--- cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original)
+++ cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Thu Sep  4 23:04:05 2008
@@ -30,16 +30,18 @@
 import java.util.SimpleTimeZone;
 import java.util.TimeZone;
 import java.util.concurrent.Executor;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.jms.BytesMessage;
+import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
+import javax.jms.MessageListener;
 import javax.jms.Queue;
-import javax.jms.QueueSender;
+import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.naming.Context;
 import javax.naming.NamingException;
 import javax.xml.namespace.QName;
 
@@ -49,8 +51,7 @@
 import org.apache.cxf.configuration.Configurable;
 import org.apache.cxf.configuration.Configurer;
 import org.apache.cxf.helpers.CastUtils;
-import org.apache.cxf.helpers.IOUtils;
-import org.apache.cxf.io.CachedOutputStream;
+import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.service.model.EndpointInfo;
@@ -59,12 +60,13 @@
 import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.transport.ConduitInitiator;
 import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.workqueue.SynchronousExecutor;
 import org.apache.cxf.workqueue.WorkQueueManager;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.cxf.wsdl.EndpointReferenceUtils;
 
-public class JMSDestination extends AbstractMultiplexDestination implements Configurable,
-    JMSOnConnectCallback {
+public class JMSDestination extends AbstractMultiplexDestination implements Configurable, MessageListener,
+    JMSExchangeSender {
 
     protected static final String BASE_BEAN_NAME_SUFFIX = ".jms-destination-base";
 
@@ -75,16 +77,14 @@
     protected AddressType address;
     protected SessionPoolType sessionPool;
     protected Destination targetDestination;
-    protected Destination replyDestination;
+    protected Destination replyToDestination;
     protected JMSSessionFactory sessionFactory;
     protected Bus bus;
     protected EndpointInfo endpointInfo;
     protected String beanNameSuffix;
-    
-    final ConduitInitiator conduitInitiator;
-
 
-    PooledSession listenerSession;
+    final ConduitInitiator conduitInitiator;
+    Session listenerSession;
     JMSListenerThread listenerThread;
 
     public JMSDestination(Bus b, ConduitInitiator ci, EndpointInfo info) throws IOException {
@@ -98,8 +98,16 @@
         initConfig();
     }
 
-    protected Logger getLogger() {
-        return LOG;
+    private void initConfig() {
+        this.runtimePolicy = endpointInfo.getTraversedExtensor(new ServerBehaviorPolicyType(),
+                                                               ServerBehaviorPolicyType.class);
+        this.serverConfig = endpointInfo.getTraversedExtensor(new ServerConfig(), ServerConfig.class);
+        this.address = endpointInfo.getTraversedExtensor(new AddressType(), AddressType.class);
+        this.sessionPool = endpointInfo.getTraversedExtensor(new SessionPoolType(), SessionPoolType.class);
+        Configurer configurer = bus.getExtension(Configurer.class);
+        if (null != configurer) {
+            configurer.configureBean(this);
+        }
     }
 
     /**
@@ -107,20 +115,57 @@
      * @return the inbuilt backchannel
      */
     protected Conduit getInbuiltBackChannel(Message inMessage) {
-        return new BackChannelConduit(EndpointReferenceUtils.getAnonymousEndpointReference(), inMessage);
+        EndpointReferenceType anon = EndpointReferenceUtils.getAnonymousEndpointReference();
+        return new BackChannelConduit(this, anon, inMessage);
+    }
+
+    private Executor getExecutor(WorkQueueManager wqm, QName name) {
+        // Fallback if no Workqueuemanager
+        Executor executor = SynchronousExecutor.getInstance();
+        if (wqm != null) {
+            if (name != null) {
+                executor = wqm.getNamedWorkQueue("{" + name.getNamespaceURI() + "}"
+                                                 + name.getLocalPart());
+            }
+            if (executor == null) {
+                executor = wqm.getNamedWorkQueue("jms");
+            }
+            if (executor == null) {
+                executor = wqm.getAutomaticWorkQueue();
+            }
+        }
+        return executor;
     }
 
+    /**
+     * Initialize Sessionfactory, Initialize and start ListenerThread {@inheritDoc}
+     */
     public void activate() {
-        getLogger().log(Level.INFO, "JMSServerTransport activate().... ");
+        getLogger().log(Level.INFO, "JMSDestination activate().... ");
+
+        if (this.address == null || this.address.getJndiConnectionFactoryName() == null) {
+            throw new RuntimeException("Insufficient configuration for Destination. "
+                                       + "Did you configure a <jms:destination name=\"" + getBeanName()
+                                       + "\"> and set the jndiConnectionFactoryName ?");
+        }
 
         try {
             getLogger().log(Level.FINE, "establishing JMS connection");
-            JMSProviderHub.connect(this, getJMSAddress(), getSessionPool(), serverConfig, runtimePolicy);
-            // Get a non-pooled session.
-            listenerSession = sessionFactory.get(targetDestination);
-            listenerThread = new JMSListenerThread(listenerSession, getEndpointInfo() == null
-                ? null : getEndpointInfo().getName());
-            listenerThread.start();
+            sessionFactory = JMSSessionFactory.connect(getJMSAddress(), getSessionPool(), serverConfig);
+            Connection connection = sessionFactory.getConnection();
+            Context context = sessionFactory.getInitialContext();
+            this.targetDestination = JMSUtils.resolveRequestDestination(context, connection, address);
+            this.replyToDestination = JMSUtils.resolveRequestDestination(context, connection, address);
+            WorkQueueManager wqm = bus.getExtension(WorkQueueManager.class);
+            QName name = null;
+            if (endpointInfo != null) {
+                name = endpointInfo.getName();
+            }
+            Executor executor = getExecutor(wqm, name);
+            String messageSelector = runtimePolicy.getMessageSelector();
+            String durableName = runtimePolicy.getDurableSubscriberName();
+            listenerThread = new JMSListenerThread(executor, this);
+            listenerThread.start(connection, targetDestination, messageSelector, durableName);
         } catch (JMSException ex) {
             getLogger().log(Level.SEVERE, "JMS connect failed with JMSException : ", ex);
         } catch (NamingException nex) {
@@ -129,17 +174,10 @@
     }
 
     public void deactivate() {
-        try {
-            listenerSession.consumer().close();
-            if (listenerThread != null) {
-                listenerThread.join();
-            }
-            sessionFactory.shutdown();
-        } catch (InterruptedException e) {
-            // Do nothing here
-        } catch (JMSException ex) {
-            // Do nothing here
+        if (listenerThread != null) {
+            listenerThread.close();
         }
+        sessionFactory.shutdown();
     }
 
     public void shutdown() {
@@ -148,21 +186,21 @@
     }
 
     public Queue getReplyToDestination(Message inMessage) throws JMSException, NamingException {
-        Queue replyTo;
         javax.jms.Message message = (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE);
         // If WS-Addressing had set the replyTo header.
-        if (inMessage.get(JMSConstants.JMS_REBASED_REPLY_TO) != null) {
-            replyTo = sessionFactory.getQueueFromInitialContext((String)inMessage
-                .get(JMSConstants.JMS_REBASED_REPLY_TO));
+        String replyToName = (String)inMessage.get(JMSConstants.JMS_REBASED_REPLY_TO);
+        if (replyToName != null) {
+            Context context = sessionFactory.getInitialContext();
+            return (Queue)context.lookup(replyToName);
+        } else if (message.getJMSReplyTo() != null) {
+            return (Queue)message.getJMSReplyTo();
         } else {
-            replyTo = (null != message.getJMSReplyTo())
-                ? (Queue)message.getJMSReplyTo() : (Queue)replyDestination;
+            return (Queue)replyToDestination;
         }
-        return replyTo;
     }
 
-    public void setReplyCorrelationID(javax.jms.Message request, 
-                                      javax.jms.Message reply) throws JMSException {
+    public void setReplyCorrelationID(javax.jms.Message request, javax.jms.Message reply)
+        throws JMSException {
 
         String correlationID = request.getJMSCorrelationID();
 
@@ -176,70 +214,127 @@
         }
     }
 
-    protected void incoming(javax.jms.Message message) throws IOException {
+    /**
+     * Convert JMS message received by ListenerThread to CXF message and inform incomingObserver that a
+     * message was received. The observer will call the service and then send the response CXF message by
+     * using the BackChannelConduit
+     * 
+     * @param message
+     * @throws IOException
+     */
+    public void onMessage(javax.jms.Message message) {
         try {
             getLogger().log(Level.FINE, "server received request: ", message);
 
-            Object request = JMSUtils.unmarshal(message);
+            byte[] request = JMSUtils.retrievePayload(message);
             getLogger().log(Level.FINE, "The Request Message is [ " + request + "]");
-            byte[] bytes = null;
-
-            if (message instanceof TextMessage) {
-                String requestString = (String)request;
-                getLogger().log(Level.FINE, "server received request: ", requestString);
-                bytes = requestString.getBytes();
-            } else {
-                // Both ByteMessage and ObjectMessage would get unmarshalled to byte array.
-                bytes = (byte[])request;
-            }
 
-            // get the message to be interceptor
+            // Build CXF message from JMS message
             MessageImpl inMessage = new MessageImpl();
-            inMessage.setContent(InputStream.class, new ByteArrayInputStream(bytes));
+            inMessage.setContent(InputStream.class, new ByteArrayInputStream(request));
             JMSUtils.populateIncomingContext(message, inMessage, JMSConstants.JMS_SERVER_REQUEST_HEADERS);
             inMessage.put(JMSConstants.JMS_SERVER_RESPONSE_HEADERS, new JMSMessageHeadersType());
             inMessage.put(JMSConstants.JMS_REQUEST_MESSAGE, message);
-
             inMessage.setDestination(this);
 
             BusFactory.setThreadDefaultBus(bus);
 
             // handle the incoming message
             incomingObserver.onMessage(inMessage);
-
-        } catch (JMSException jmsex) {
-            // TODO: need to revisit for which exception should we throw.
-            throw new IOException(jmsex.getMessage());
+        } catch (JMSException e) {
+            throw new RuntimeException("Error handling JMS message", e);
         } finally {
             BusFactory.setThreadDefaultBus(null);
         }
     }
 
-    public void connected(javax.jms.Destination target, 
-                          javax.jms.Destination reply, 
-                          JMSSessionFactory factory) {
-        this.targetDestination = target;
-        this.replyDestination = reply;
-        this.sessionFactory = factory;
-    }
+    public void sendExchange(Exchange exchange, Object replyObj) {
+        Message inMessage = exchange.getInMessage();
+        Message outMessage = exchange.getOutMessage();
+        if (!JMSUtils.isDestinationStyleQueue(address)) {
+            // we will never receive a non-oneway invocation in pub-sub
+            // domain from CXF client - however a mis-behaving pure JMS
+            // client could conceivably make suce an invocation, in which
+            // case we silently discard the reply
+            getLogger().log(Level.WARNING, "discarding reply for non-oneway invocation ",
+                            "with 'topic' destinationStyle");
+            return;
+        }
+        PooledSession replySession = null;
+        try {
+            // setup the reply message
+            replySession = sessionFactory.get();
+            javax.jms.Message request = (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE);
+            String msgType = null;
+            if (request instanceof TextMessage) {
+                msgType = JMSConstants.TEXT_MESSAGE_TYPE;
+            } else if (request instanceof BytesMessage) {
+                msgType = JMSConstants.BYTE_MESSAGE_TYPE;
+            } else {
+                msgType = JMSConstants.BINARY_MESSAGE_TYPE;
+            }
+            javax.jms.Message reply = JMSUtils
+                .createAndSetPayload(replyObj, replySession.session(), msgType);
 
-    public String getBeanName() {
-        return endpointInfo.getName().toString() + ".jms-destination";
-    }
+            setReplyCorrelationID(request, reply);
+            JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
+                .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS);
+            JMSUtils.setMessageProperties(headers, reply);
+            // ensure that the contentType is set to the out jms message header
+            JMSUtils.setContentToProtocolHeader(outMessage);
+            Map<String, List<String>> protHeaders = CastUtils.cast((Map<?, ?>)outMessage
+                .get(Message.PROTOCOL_HEADERS));
+            JMSUtils.addProtocolHeaders(reply, protHeaders);
+            Destination replyTo = getReplyToDestination(inMessage);
 
-    private void initConfig() {
-        this.runtimePolicy = endpointInfo.getTraversedExtensor(new ServerBehaviorPolicyType(),
-                                                               ServerBehaviorPolicyType.class);
-        this.serverConfig = endpointInfo.getTraversedExtensor(new ServerConfig(), ServerConfig.class);
-        this.address = endpointInfo.getTraversedExtensor(new AddressType(), AddressType.class);
-        this.sessionPool = endpointInfo.getTraversedExtensor(new SessionPoolType(), SessionPoolType.class);
+            JMSMessageHeadersType inHeaders = (JMSMessageHeadersType)inMessage
+                .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
 
-        Configurer configurer = bus.getExtension(Configurer.class);
-        if (null != configurer) {
-            configurer.configureBean(this);
+            long timeToLive = 0;
+            if (request.getJMSExpiration() > 0) {
+                TimeZone tz = new SimpleTimeZone(0, "GMT");
+                Calendar cal = new GregorianCalendar(tz);
+                timeToLive = request.getJMSExpiration() - cal.getTimeInMillis();
+            }
+
+            if (timeToLive < 0) {
+                getLogger().log(Level.INFO, "Message time to live is already expired skipping response.");
+                return;
+            }
+
+            int deliveryMode = JMSUtils.getJMSDeliveryMode(inHeaders);
+            int priority = JMSUtils.getJMSPriority(inHeaders);
+            long ttl = JMSUtils.getTimeToLive(headers);
+            if (ttl <= 0) {
+                ttl = getServerConfig().getMessageTimeToLive();
+            }
+            if (ttl <= 0) {
+                ttl = timeToLive;
+            }
+            getLogger().log(Level.FINE, "send out the message!");
+            replySession.producer().send(replyTo, reply, deliveryMode, priority, ttl);
+
+            getLogger().log(Level.FINE, "just server sending reply: ", reply);
+            // Check the reply time limit Stream close will call for this
+        } catch (JMSException ex) {
+            getLogger().log(Level.WARNING, "Failed in post dispatch ...", ex);
+            throw new RuntimeException(ex.getMessage());
+        } catch (NamingException nex) {
+            getLogger().log(Level.WARNING, "Failed in post dispatch ...", nex);
+            throw new RuntimeException(nex.getMessage());
+        } finally {
+            sessionFactory.recycle(replySession);
         }
     }
 
+    protected Logger getLogger() {
+        return LOG;
+    }
+
+    public String getBeanName() {
+        return endpointInfo.getName().toString() + ".jms-destination";
+    }
+
     public AddressType getJMSAddress() {
         return address;
     }
@@ -272,100 +367,16 @@
         this.sessionPool = sessionPool;
     }
 
-    protected class JMSListenerThread extends Thread {
-        private final PooledSession listenSession;
-        private final QName name;
-
-        public JMSListenerThread(PooledSession session, QName n) {
-            listenSession = session;
-            name = n;
-        }
-
-        public void run() {
-            try {
-                Executor executor = null;
-                if (executor == null) {
-                    WorkQueueManager wqm = bus.getExtension(WorkQueueManager.class);
-                    if (null != wqm) {
-                        if (name != null) {
-                            executor = wqm.getNamedWorkQueue("{" + name.getNamespaceURI() + "}"
-                                                             + name.getLocalPart());
-                        }
-                        if (executor == null) {
-                            executor = wqm.getNamedWorkQueue("jms");
-                        }
-                        if (executor == null) {
-                            executor = wqm.getAutomaticWorkQueue();
-                        }
-                    }
-                }
-                while (true) {
-                    javax.jms.Message message = listenSession.consumer().receive();
-                    if (message == null) {
-                        getLogger().log(Level.WARNING, "Null message received from message consumer.",
-                                        " Exiting ListenerThread::run().");
-                        return;
-                    }
-                    while (message != null) {
-                        // REVISIT to get the thread pool
-                        // Executor executor = jmsDestination.callback.getExecutor();
-                        if (executor != null) {
-                            try {
-                                executor.execute(new JMSExecutor(message));
-                                message = null;
-                            } catch (RejectedExecutionException ree) {
-                                // FIXME - no room left on workqueue, what to do
-                                // for now, loop until it WILL fit on the queue,
-                                // although we could just dispatch on this thread.
-                            }
-                        } else {
-                            getLogger().log(Level.INFO, "handle the incoming message in listener thread");
-                            try {
-                                incoming(message);
-                            } catch (IOException ex) {
-                                getLogger().log(Level.WARNING, "Failed to process incoming message : ", ex);
-                            }
-                        }
-                        message = null;
-                    }
-                }
-            } catch (JMSException jmsex) {
-                jmsex.printStackTrace();
-                getLogger().log(Level.SEVERE, "Exiting ListenerThread::run(): ", jmsex.getMessage());
-            } catch (Throwable jmsex) {
-                jmsex.printStackTrace();
-                getLogger().log(Level.SEVERE, "Exiting ListenerThread::run(): ", jmsex.getMessage());
-            }
-        }
-    }
-
-    protected class JMSExecutor implements Runnable {
-        javax.jms.Message message;
-
-        JMSExecutor(javax.jms.Message m) {
-            message = m;
-        }
-
-        public void run() {
-            getLogger().log(Level.INFO, "run the incoming message in the threadpool");
-            try {
-                incoming(message);
-            } catch (IOException ex) {
-                // TODO: Decide what to do if we receive the exception.
-                getLogger().log(Level.WARNING, "Failed to process incoming message : ", ex);
-            }
-        }
-
-    }
-
     // this should deal with the cxf message
     protected class BackChannelConduit extends AbstractConduit {
 
         protected Message inMessage;
+        private JMSExchangeSender sender;
 
-        BackChannelConduit(EndpointReferenceType ref, Message message) {
+        BackChannelConduit(JMSExchangeSender sender, EndpointReferenceType ref, Message message) {
             super(ref);
             inMessage = message;
+            this.sender = sender;
         }
 
         /**
@@ -385,14 +396,20 @@
          */
         public void prepare(Message message) throws IOException {
             // setup the message to be send back
-            message.put(JMSConstants.JMS_REQUEST_MESSAGE, inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE));
+            javax.jms.Message jmsMessage = (javax.jms.Message)inMessage
+                .get(JMSConstants.JMS_REQUEST_MESSAGE);
+            message.put(JMSConstants.JMS_REQUEST_MESSAGE, jmsMessage);
 
             if (!message.containsKey(JMSConstants.JMS_SERVER_RESPONSE_HEADERS)
                 && inMessage.containsKey(JMSConstants.JMS_SERVER_RESPONSE_HEADERS)) {
                 message.put(JMSConstants.JMS_SERVER_RESPONSE_HEADERS, inMessage
                     .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS));
             }
-            message.setContent(OutputStream.class, new JMSOutputStream(inMessage, message));
+
+            Exchange exchange = inMessage.getExchange();
+            exchange.setOutMessage(message);
+            message.setContent(OutputStream.class, new JMSOutputStream(sender, exchange,
+                                                                       jmsMessage instanceof TextMessage));
         }
 
         protected Logger getLogger() {
@@ -400,147 +417,4 @@
         }
     }
 
-    private class JMSOutputStream extends CachedOutputStream {
-
-        private Message inMessage;
-        private Message outMessage;
-        private javax.jms.Message reply;
-        private Queue replyTo;
-        private QueueSender sender;
-
-        // setup the ByteArrayStream
-        public JMSOutputStream(Message m, Message o) {
-            super();
-            inMessage = m;
-            outMessage = o;
-        }
-
-        // to prepear the message and get the send out message
-        private void commitOutputMessage() throws IOException {
-
-            JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
-                .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS);
-            javax.jms.Message request = (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE);
-
-            PooledSession replySession = null;
-
-            if (JMSUtils.isDestinationStyleQueue(address)) {
-                try {
-                    // setup the reply message
-                    replyTo = getReplyToDestination(inMessage);
-                    replySession = sessionFactory.get(false);
-                    sender = (QueueSender)replySession.producer();
-
-                    String msgType = JMSConstants.TEXT_MESSAGE_TYPE;
-                    Object replyObj = null;
-
-                    if (request instanceof TextMessage) {
-                        StringBuilder builder = new StringBuilder();
-                        this.writeCacheTo(builder);
-                        replyObj = builder.toString();
-                        msgType = JMSConstants.TEXT_MESSAGE_TYPE;
-                    } else if (request instanceof BytesMessage) {
-                        replyObj = getBytes();
-                        msgType = JMSConstants.BYTE_MESSAGE_TYPE;
-                    } else {
-                        replyObj = getBytes();
-                        msgType = JMSConstants.BINARY_MESSAGE_TYPE;
-                    }
-
-                    if (getLogger().isLoggable(Level.FINE)) {
-                        getLogger().log(
-                                        Level.FINE,
-                                        "The response message is ["
-                                            + (replyObj instanceof String ? (String)replyObj : IOUtils
-                                                .newStringFromBytes((byte[])replyObj)) + "]");
-                    }
-
-                    reply = JMSUtils.marshal(replyObj, replySession.session(), null, msgType);
-
-                    setReplyCorrelationID(request, reply);
-                    JMSUtils.setMessageProperties(headers, reply);
-                    // ensure that the contentType is set to the out jms message header
-                    JMSUtils.setContentToProtocalHeader(outMessage);
-                    Map<String, List<String>> protHeaders = CastUtils.cast((Map<?, ?>)outMessage
-                        .get(Message.PROTOCOL_HEADERS));
-                    JMSUtils.addProtocolHeaders(reply, protHeaders);
-
-                    sendResponse();
-
-                } catch (JMSException ex) {
-                    getLogger().log(Level.WARNING, "Failed in post dispatch ...", ex);
-                    throw new IOException(ex.getMessage());
-                } catch (NamingException nex) {
-                    getLogger().log(Level.WARNING, "Failed in post dispatch ...", nex);
-                    throw new IOException(nex.getMessage());
-                } finally {
-                    // house-keeping
-                    if (replySession != null) {
-                        sessionFactory.recycle(replySession);
-                    }
-                }
-            } else {
-                // we will never receive a non-oneway invocation in pub-sub
-                // domain from CXF client - however a mis-behaving pure JMS
-                // client could conceivably make suce an invocation, in which
-                // case we silently discard the reply
-                getLogger().log(Level.WARNING, "discarding reply for non-oneway invocation ",
-                                "with 'topic' destinationStyle");
-
-            }
-
-            getLogger().log(Level.FINE, "just server sending reply: ", reply);
-            // Check the reply time limit Stream close will call for this
-
-        }
-
-        private void sendResponse() throws JMSException {
-            JMSMessageHeadersType headers = (JMSMessageHeadersType)inMessage
-                .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
-            javax.jms.Message request = (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE);
-
-            int deliveryMode = JMSUtils.getJMSDeliveryMode(headers);
-            int priority = JMSUtils.getJMSPriority(headers);
-            long ttl = JMSUtils.getTimeToLive(headers);
-
-            if (ttl <= 0) {
-                ttl = getServerConfig().getMessageTimeToLive();
-            }
-
-            long timeToLive = 0;
-            if (request.getJMSExpiration() > 0) {
-                TimeZone tz = new SimpleTimeZone(0, "GMT");
-                Calendar cal = new GregorianCalendar(tz);
-                timeToLive = request.getJMSExpiration() - cal.getTimeInMillis();
-            }
-
-            if (timeToLive >= 0) {
-                ttl = ttl > 0 ? ttl : timeToLive;
-                getLogger().log(Level.FINE, "send out the message!");
-                sender.send(replyTo, reply, deliveryMode, priority, ttl);
-            } else {
-                // the request message had dead
-                getLogger().log(Level.INFO, "Message time to live is already expired skipping response.");
-            }
-        }
-
-        @Override
-        protected void doFlush() throws IOException {
-            // TODO Auto-generated method stub
-
-        }
-
-        @Override
-        protected void doClose() throws IOException {
-
-            commitOutputMessage();
-        }
-
-        @Override
-        protected void onWrite() throws IOException {
-            // Do nothing here
-        }
-
-    }
-
 }

Modified: cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java?rev=692344&r1=692343&r2=692344&view=diff
==============================================================================
--- cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java (original)
+++ cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java Thu Sep  4 23:04:05 2008
@@ -19,24 +19,18 @@
 
 package org.apache.cxf.transport.jms;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Calendar;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Queue;
 import javax.jms.QueueConnection;
-import javax.jms.QueueSession;
+import javax.jms.QueueConnectionFactory;
 import javax.jms.Session;
-import javax.jms.Topic;
 import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
+import javax.jms.TopicConnectionFactory;
 import javax.naming.Context;
 import javax.naming.NamingException;
 
@@ -57,7 +51,7 @@
  * unidentified producer to send the request message
  * <p>
  * server-side send: each dispatch of a twoway request requires relatively short-term exclusive use of a
- * session and an indentified producer (but not a consumer) - note that the session used for the recieve side
+ * session and an identified producer (but not a consumer) - note that the session used for the receive side
  * cannot be re-used for the send, as MessageListener usage precludes any synchronous sends or receives on
  * that session
  * <p>
@@ -68,13 +62,13 @@
  * strategies make sense ...
  * <p>
  * client-side: a SoftReference-based cache of send/receive sessions is maintained containing an aggregate of
- * a session, indentified producer, temporary reply destination & consumer for same
+ * a session, identified producer, temporary reply destination & consumer for same
  * <p>
  * server-side receive: as sessions cannot be usefully recycled, they are simply created on demand and closed
  * when no longer required
  * <p>
  * server-side send: a SoftReference-based cache of send-only sessions is maintained containing an aggregate
- * of a session and an indentified producer
+ * of a session and an identified producer
  * <p>
  * In a pure client or pure server, only a single cache is ever populated. Where client and server logic is
  * co-located, a client session retrieval for a twoway invocation checks the reply-capable cache first and
@@ -93,80 +87,118 @@
     private int highWaterMark;
 
     private final Context initialContext;
-    private final Connection theConnection;
-    private AbstractTwoStageCache<PooledSession> replyCapableSessionCache;
-    private AbstractTwoStageCache<PooledSession> sendOnlySessionCache;
-    private final Destination theReplyDestination;
-    private final ServerBehaviorPolicyType runtimePolicy;
+    private ConnectionFactory connectionFactory;
+    private final Connection connection;
+    private AbstractTwoStageCache<PooledSession> sessionCache;
     private boolean destinationIsQueue;
 
     /**
      * Constructor.
      * 
+     * @param connectionFactory
      * @param connection the shared {Queue|Topic}Connection
      */
-    public JMSSessionFactory(Connection connection, Destination replyDestination, Context context,
-                             boolean destinationIsQueue, SessionPoolType sessionPoolConfig,
-                             ServerBehaviorPolicyType runtimePolicy) {
-        theConnection = connection;
-        theReplyDestination = replyDestination;
+    protected JMSSessionFactory(ConnectionFactory connectionFactory, Connection connection,
+                                Destination replyDestination, Context context, boolean destinationIsQueue,
+                                SessionPoolType sessionPoolConfig) {
+        this.connectionFactory = connectionFactory;
+        this.connection = connection;
+        this.destinationIsQueue = destinationIsQueue;
         initialContext = context;
-        this.runtimePolicy = runtimePolicy;
 
         lowWaterMark = sessionPoolConfig.getLowWaterMark();
         highWaterMark = sessionPoolConfig.getHighWaterMark();
-        this.destinationIsQueue = destinationIsQueue;
 
         // create session caches (REVISIT sizes should be configurable)
-        //
-
-        if (destinationIsQueue) {
-            // the reply capable cache is only required in the point-to-point
-            // domain
-            //
-            replyCapableSessionCache = new AbstractTwoStageCache<PooledSession>(lowWaterMark, highWaterMark,
-                0, this) {
+        try {
+            sessionCache = new AbstractTwoStageCache<PooledSession>(lowWaterMark, highWaterMark, 0, this) {
                 public final PooledSession create() throws JMSException {
-                    return createPointToPointReplyCapableSession();
+                    return createSession();
                 }
             };
+            sessionCache.populateCache();
+        } catch (Throwable t) {
+            LOG.log(Level.FINE, "JMS Session cache populate failed: " + t);
+        }
+    }
 
-            try {
-                replyCapableSessionCache.populateCache();
-            } catch (Throwable t) {
-                LOG.log(Level.FINE, "JMS Session cache populate failed: " + t);
-            }
-
-            // send-only cache for point-to-point oneway requests and replies
-            //
-            sendOnlySessionCache = new AbstractTwoStageCache<PooledSession>(lowWaterMark, highWaterMark, 0,
-                this) {
-                public final PooledSession create() throws JMSException {
-                    return createPointToPointSendOnlySession();
-                }
-            };
+    /**
+     * Helper method to create a point-to-point pooled session.
+     * 
+     * @return an appropriate pooled session
+     */
+    private PooledSession createSession() throws JMSException {
+        Session session = null;
+        if (destinationIsQueue) {
+            session = ((QueueConnection)connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+        } else {
+            session = ((TopicConnection)connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+        }
+        return new PooledSession(session, destinationIsQueue);
+    }
 
-            try {
-                sendOnlySessionCache.populateCache();
-            } catch (Throwable t) {
-                LOG.log(Level.FINE, "JMS Session cache populate failed: " + t);
+    /**
+     * This class acts as the hub of JMS provider usage, creating shared JMS Connections and providing access
+     * to a pool of JMS Sessions.
+     * <p>
+     * A new JMS connection is created for each each port based <jms:address> - however its likely that in
+     * practice the same JMS provider will be specified for each port, and hence the connection resources
+     * could be shared accross ports.
+     * <p>
+     * For the moment this class is realized as just a container for static methods, but the intention is to
+     * support in future sharing of JMS resources accross compatible ports.
+     */
+    protected static JMSSessionFactory connect(AddressType addrDetails, SessionPoolType sessionPoolConfig,
+                                               ServerConfig serverConfig) throws JMSException,
+        NamingException {
+
+        Context context = JMSUtils.getInitialContext(addrDetails);
+        ConnectionFactory connectionFactory;
+        Connection connection = null;
+
+        if (JMSUtils.isDestinationStyleQueue(addrDetails)) {
+            QueueConnectionFactory qcf = (QueueConnectionFactory)context.lookup(addrDetails
+                .getJndiConnectionFactoryName());
+            if (addrDetails.isSetConnectionUserName()) {
+                connection = qcf.createQueueConnection(addrDetails.getConnectionUserName(), addrDetails
+                    .getConnectionPassword());
+            } else {
+                connection = qcf.createQueueConnection();
             }
+            connectionFactory = qcf;
         } else {
-            // send-only cache for pub-sub oneway requests
-            //
-            sendOnlySessionCache = new AbstractTwoStageCache<PooledSession>(lowWaterMark, highWaterMark, 0,
-                this) {
-                public final PooledSession create() throws JMSException {
-                    return createPubSubSession(true, false, null);
-                }
-            };
+            TopicConnectionFactory tcf = (TopicConnectionFactory)context.lookup(addrDetails
+                .getJndiConnectionFactoryName());
+            if (addrDetails.isSetConnectionUserName()) {
+                connection = tcf.createTopicConnection(addrDetails.getConnectionUserName(), addrDetails
+                    .getConnectionPassword());
+            } else {
+                connection = tcf.createTopicConnection();
+            }
+            connectionFactory = tcf;
+        }
 
-            try {
-                sendOnlySessionCache.populateCache();
-            } catch (Throwable t) {
-                LOG.log(Level.FINE, "JMS Session cache populate failed: " + t);
+        if (null != serverConfig) {
+            String clientID = serverConfig.getDurableSubscriptionClientId();
+
+            if (clientID != null) {
+                connection.setClientID(clientID);
             }
         }
+        connection.start();
+        /*
+         * Destination requestDestination = resolveRequestDestination(context, connection, addrDetails);
+         */
+
+        Destination replyDestination = JMSUtils.resolveReplyDestination(context, connection, addrDetails);
+
+        // create session factory to manage session, reply destination,
+        // producer and consumer pooling
+        //
+        JMSSessionFactory sf = new JMSSessionFactory(connectionFactory, connection, replyDestination,
+                                                     context, JMSUtils.isDestinationStyleQueue(addrDetails),
+                                                     sessionPoolConfig);
+        return sf;
     }
 
     // --java.lang.Object Overrides----------------------------------------------
@@ -175,100 +207,35 @@
     }
 
     // --Methods-----------------------------------------------------------------
-    protected Connection getConnection() {
-        return theConnection;
-    }
 
-    public Queue getQueueFromInitialContext(String queueName) throws NamingException {
-        return (Queue)initialContext.lookup(queueName);
+    public ConnectionFactory getConnectionFactory() {
+        return connectionFactory;
+    }
+    
+    protected Connection getConnection() {
+        return connection;
     }
 
-    public PooledSession get(boolean replyCapable) throws JMSException {
-        return get(null, replyCapable);
+    public Context getInitialContext() {
+        return initialContext;
     }
 
     /**
      * Retrieve a new or cached Session.
      * 
-     * @param replyDest Destination name if coming from wsa:Header
-     * @param replyCapable true iff the session is to be used to receive replies (implies client side twoway
-     *                invocation )
      * @return a new or cached Session
      */
-    public PooledSession get(Destination replyDest, boolean replyCapable) throws JMSException {
+    public PooledSession get() {
         PooledSession ret = null;
 
         synchronized (this) {
-            if (replyCapable) {
-                // first try reply capable cache
-                //
-                ret = replyCapableSessionCache.poll();
-
-                if (ret == null) {
-                    // fall back to send only cache, creating temporary reply
-                    // queue and consumer
-                    //
-                    ret = sendOnlySessionCache.poll();
-
-                    if (ret != null) {
-                        QueueSession session = (QueueSession)ret.session();
-                        Queue destination = null;
-                        String selector = null;
-
-                        if (null != theReplyDestination || null != replyDest) {
-                            destination = null != replyDest ? (Queue)replyDest : (Queue)theReplyDestination;
-
-                            selector = "JMSCorrelationID = '" + generateUniqueSelector(ret) + "'";
-                        }
-
-                        if (destination == null) {
-                            // neither replyDestination not replyDest are present.
-                            destination = session.createTemporaryQueue();
-                            selector = "JMSCorrelationID = '" + generateUniqueSelector(ret) + "'";
-                        }
-
-                        ret.destination(destination);
-                        MessageConsumer consumer = session.createReceiver(destination, selector);
-                        ret.consumer(consumer);
-                    } else {
-                        // no pooled session available in either cache => create one in
-                        // in the reply capable cache
-                        //
-                        try {
-                            ret = replyCapableSessionCache.get();
-                        } catch (Throwable t) {
-                            // factory method may only throw JMSException
-                            //
-                            throw (JMSException)t;
-                        }
-                    }
-                }
-            } else {
-                // first try send only cache
-                //
-                ret = sendOnlySessionCache.poll();
-
-                if (ret == null) {
-                    // fall back to reply capable cache if one exists (only in the
-                    // point-to-point domain), ignoring temporary reply destination
-                    // and consumer
-                    //
-                    if (replyCapableSessionCache != null) {
-                        ret = replyCapableSessionCache.poll();
-                    }
-
-                    if (ret == null) {
-                        // no pooled session available in either cache => create one in
-                        // in the send only cache
-                        //
-                        try {
-                            ret = sendOnlySessionCache.get();
-                        } catch (Throwable t) {
-                            // factory method may only throw JMSException
-                            //
-                            throw (JMSException)t;
-                        }
-                    }
+            ret = sessionCache.poll();
+
+            if (ret == null) {
+                try {
+                    ret = sessionCache.get();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
                 }
             }
         }
@@ -277,27 +244,6 @@
     }
 
     /**
-     * Retrieve a new
-     * 
-     * @param destination the target JMS queue or topic (non-null implies server receive side)
-     * @return a new or cached Session
-     */
-    public PooledSession get(Destination destination) throws JMSException {
-        PooledSession ret = null;
-
-        // the destination is only specified on the server receive side,
-        // in which case a new session is always created
-        //
-        if (destinationIsQueue) {
-            ret = createPointToPointServerSession(destination);
-        } else {
-            ret = createPubSubSession(false, true, destination);
-        }
-
-        return ret;
-    }
-
-    /**
      * Return a Session to the pool
      * 
      * @param pooled_session the session to recycle
@@ -305,15 +251,14 @@
     public void recycle(PooledSession pooledSession) {
         // sessions used long-term by the server receive side are not cached,
         // only non-null destinations are temp queues
-        final boolean replyCapable = pooledSession.destination() != null;
+        if (pooledSession == null) {
+            return;
+        }
         boolean discard = false;
 
+        // re-cache session, closing if it cannot be it can be accomodated
         synchronized (this) {
-            // re-cache session, closing if it cannot be it can be accomodated
-            //
-            discard = replyCapable
-                ? (!replyCapableSessionCache.recycle(pooledSession)) : (!sendOnlySessionCache
-                    .recycle(pooledSession));
+            discard = !sessionCache.recycle(pooledSession);
         }
 
         if (discard) {
@@ -332,124 +277,21 @@
         try {
             PooledSession curr;
 
-            if (replyCapableSessionCache != null) {
-                curr = replyCapableSessionCache.poll();
-                while (curr != null) {
-                    curr.close();
-                    curr = replyCapableSessionCache.poll();
-                }
-            }
-
-            if (sendOnlySessionCache != null) {
-                curr = sendOnlySessionCache.poll();
+            if (sessionCache != null) {
+                curr = sessionCache.poll();
                 while (curr != null) {
                     curr.close();
-                    curr = sendOnlySessionCache.poll();
+                    curr = sessionCache.poll();
                 }
             }
 
-            theConnection.close();
+            connection.close();
         } catch (JMSException e) {
             LOG.log(Level.WARNING, "queue connection close failed: " + e);
         }
 
         // help GC
         //
-        replyCapableSessionCache = null;
-        sendOnlySessionCache = null;
-    }
-
-    /**
-     * Helper method to create a point-to-point pooled session.
-     * 
-     * @param producer true iff producing
-     * @param consumer true iff consuming
-     * @param destination the target destination
-     * @return an appropriate pooled session
-     */
-    PooledSession createPointToPointReplyCapableSession() throws JMSException {
-        QueueSession session = ((QueueConnection)theConnection).createQueueSession(false,
-                                                                                   Session.AUTO_ACKNOWLEDGE);
-        Destination destination = null;
-        String selector = null;
-
-        if (null != theReplyDestination) {
-            destination = theReplyDestination;
-
-            selector = "JMSCorrelationID = '" + generateUniqueSelector(session) + "'";
-
-        } else {
-            destination = session.createTemporaryQueue();
-        }
-
-        MessageConsumer consumer = session.createReceiver((Queue)destination, selector);
-        return new PooledSession(session, destination, session.createSender(null), consumer);
-    }
-
-    /**
-     * Helper method to create a point-to-point pooled session.
-     * 
-     * @return an appropriate pooled session
-     */
-    PooledSession createPointToPointSendOnlySession() throws JMSException {
-        QueueSession session = ((QueueConnection)theConnection).createQueueSession(false,
-                                                                                   Session.AUTO_ACKNOWLEDGE);
-
-        return new PooledSession(session, null, session.createSender(null), null);
-    }
-
-    /**
-     * Helper method to create a point-to-point pooled session for consumer only.
-     * 
-     * @param destination the target destination
-     * @return an appropriate pooled session
-     */
-    private PooledSession createPointToPointServerSession(Destination destination) throws JMSException {
-        QueueSession session = ((QueueConnection)theConnection).createQueueSession(false,
-                                                                                   Session.AUTO_ACKNOWLEDGE);
-
-        return new PooledSession(session, destination, session.createSender(null), session
-            .createReceiver((Queue)destination, runtimePolicy.getMessageSelector()));
-    }
-
-    /**
-     * Helper method to create a pub-sub pooled session.
-     * 
-     * @param producer true iff producing
-     * @param consumer true iff consuming
-     * @param destination the target destination
-     * @return an appropriate pooled session
-     */
-    PooledSession createPubSubSession(boolean producer, boolean consumer, Destination destination)
-        throws JMSException {
-        TopicSession session = ((TopicConnection)theConnection).createTopicSession(false,
-                                                                                   Session.AUTO_ACKNOWLEDGE);
-        TopicSubscriber sub = null;
-        if (consumer) {
-            String messageSelector = runtimePolicy.getMessageSelector();
-            String durableName = runtimePolicy.getDurableSubscriberName();
-            if (durableName != null) {
-                sub = session
-                    .createDurableSubscriber((Topic)destination, durableName, messageSelector, false);
-            } else {
-                sub = session.createSubscriber((Topic)destination, messageSelector, false);
-            }
-        }
-
-        return new PooledSession(session, null, producer ? session.createPublisher(null) : null, sub);
-    }
-
-    private String generateUniqueSelector(Object obj) {
-        String host = "localhost";
-
-        try {
-            InetAddress addr = InetAddress.getLocalHost();
-            host = addr.getHostName();
-        } catch (UnknownHostException ukex) {
-            // Default to localhost.
-        }
-
-        long time = Calendar.getInstance().getTimeInMillis();
-        return host + "_" + System.getProperty("user.name") + "_" + obj + time;
+        sessionCache = null;
     }
 }

Modified: cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java?rev=692344&r1=692343&r2=692344&view=diff
==============================================================================
--- cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java (original)
+++ cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java Thu Sep  4 23:04:05 2008
@@ -30,12 +30,18 @@
 import java.util.logging.Logger;
 
 import javax.jms.BytesMessage;
+import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
+import javax.jms.MessageProducer;
 import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
 import javax.naming.Context;
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
@@ -136,7 +142,7 @@
      * @param replyTo the ReplyTo destination if any
      * @return a JMS of the appropriate type populated with the given payload
      */
-    public static Message marshal(Object payload, Session session, Destination replyTo, String messageType)
+    public static Message createAndSetPayload(Object payload, Session session, String messageType)
         throws JMSException {
         Message message = null;
 
@@ -150,32 +156,26 @@
             ((ObjectMessage)message).setObject((byte[])payload);
         }
 
-        if (replyTo != null) {
-            message.setJMSReplyTo(replyTo);
-        }
-
         return message;
     }
 
     /**
-     * Unmarshal the payload of an incoming message.
+     * Extract the payload of an incoming message.
      * 
      * @param message the incoming message
-     * @return the unmarshalled message payload, either of type String or byte[] depending on payload type
+     * @return the message payload as byte[]
      */
-    public static Object unmarshal(Message message) throws JMSException {
-        Object ret = null;
+    public static byte[] retrievePayload(Message message) throws JMSException {
+        byte[] ret = null;
 
         if (message instanceof TextMessage) {
-            ret = ((TextMessage)message).getText();
+            ret = ((TextMessage)message).getText().getBytes();
         } else if (message instanceof BytesMessage) {
-            byte[] retBytes = new byte[(int)((BytesMessage)message).getBodyLength()];
-            ((BytesMessage)message).readBytes(retBytes);
-            ret = retBytes;
+            ret = new byte[(int)((BytesMessage)message).getBodyLength()];
+            ((BytesMessage)message).readBytes(ret);
         } else {
             ret = (byte[])((ObjectMessage)message).getObject();
         }
-
         return ret;
     }
 
@@ -251,7 +251,7 @@
         return headers;
     }
 
-    public static void setContentToProtocalHeader(org.apache.cxf.message.Message message) {
+    public static void setContentToProtocolHeader(org.apache.cxf.message.Message message) {
         String contentType = (String)message.get(org.apache.cxf.message.Message.CONTENT_TYPE);
 
         Map<String, List<String>> headers = JMSUtils.getSetProtocolHeaders(message);
@@ -268,4 +268,107 @@
     public static boolean isDestinationStyleQueue(AddressType address) {
         return JMSConstants.JMS_QUEUE.equals(address.getDestinationStyle().value());
     }
+
+    public static Message buildJMSMessageFromCXFMessage(org.apache.cxf.message.Message outMessage,
+                                                        Object payload, String messageType, Session session,
+                                                        Destination replyTo, String correlationId)
+        throws JMSException {
+        Message jmsMessage = JMSUtils.createAndSetPayload(payload, session, messageType);
+
+        if (replyTo != null) {
+            jmsMessage.setJMSReplyTo(replyTo);
+        }
+
+        JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
+            .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
+
+        String correlationID = JMSUtils.getCorrelationId(headers);
+
+        JMSUtils.setMessageProperties(headers, jmsMessage);
+        // ensure that the contentType is set to the out jms message header
+        JMSUtils.setContentToProtocolHeader(outMessage);
+        Map<String, List<String>> protHeaders = CastUtils.cast((Map<?, ?>)outMessage
+            .get(org.apache.cxf.message.Message.PROTOCOL_HEADERS));
+        JMSUtils.addProtocolHeaders(jmsMessage, protHeaders);
+        if (!outMessage.getExchange().isOneWay()) {
+            String id = correlationId;
+
+            if (id != null) {
+                if (correlationID != null) {
+                    String error = "User cannot set JMSCorrelationID when "
+                                   + "making a request/reply invocation using " + "a static replyTo Queue.";
+                    throw new JMSException(error);
+                }
+                correlationID = id;
+            }
+        }
+
+        if (correlationID != null) {
+            jmsMessage.setJMSCorrelationID(correlationID);
+        } else {
+            // No message correlation id is set. Whatever comeback will be accepted as responses.
+            // We assume that it will only happen in case of the temp. reply queue.
+        }
+        return jmsMessage;
+    }
+
+    public static void sendMessage(MessageProducer producer, Destination destination, Message jmsMessage,
+                                   long timeToLive, int deliveryMode, int priority) throws JMSException {
+        /*
+         * Can this be changed to producer.send(destination, jmsMessage, deliveryMode, priority, timeToLive);
+         */
+
+        if (destination instanceof Queue) {
+            QueueSender sender = (QueueSender)producer;
+            sender.setTimeToLive(timeToLive);
+            sender.send((Queue)destination, jmsMessage, deliveryMode, priority, timeToLive);
+        } else {
+            TopicPublisher publisher = (TopicPublisher)producer;
+            publisher.setTimeToLive(timeToLive);
+            publisher.publish((Topic)destination, jmsMessage, deliveryMode, priority, timeToLive);
+        }
+    }
+
+    public static Destination resolveRequestDestination(Context context, Connection connection,
+                                                        AddressType addrDetails) throws JMSException,
+        NamingException {
+        Destination requestDestination = null;
+        // see if jndiDestination is set
+        if (addrDetails.getJndiDestinationName() != null) {
+            requestDestination = (Destination)context.lookup(addrDetails.getJndiDestinationName());
+        }
+
+        // if no jndiDestination or it fails see if jmsDestination is set
+        // and try to create it.
+        if (requestDestination == null && addrDetails.getJmsDestinationName() != null) {
+            if (JMSUtils.isDestinationStyleQueue(addrDetails)) {
+                requestDestination = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+                    .createQueue(addrDetails.getJmsDestinationName());
+            } else {
+                requestDestination = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+                    .createTopic(addrDetails.getJmsDestinationName());
+            }
+        }
+        return requestDestination;
+    }
+
+    public static Queue resolveReplyDestination(Context context, Connection connection,
+                                                      AddressType addrDetails) throws NamingException,
+        JMSException {
+        Queue replyDestination = null;
+
+        // Reply Destination is used (if present) only if the session is
+        // point-to-point session
+        if (JMSUtils.isDestinationStyleQueue(addrDetails)) {
+            if (addrDetails.getJndiReplyDestinationName() != null) {
+                replyDestination = (Queue)context.lookup(addrDetails.getJndiReplyDestinationName());
+            }
+            if (replyDestination == null && addrDetails.getJmsReplyDestinationName() != null) {
+                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                replyDestination = session.createQueue(addrDetails.getJmsReplyDestinationName());
+                session.close();
+            }
+        }
+        return replyDestination;
+    }
 }