You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commons-dev@ws.apache.org by se...@apache.org on 2009/01/03 07:33:41 UTC

svn commit: r730924 [3/9] - in /webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules: base/src/main/java/org/apache/axis2/format/ base/src/main/java/org/apache/axis2/transport/base/ base/src/main/java/org/apache/axis2/transport/base/data...

Modified: webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageReceiver.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageReceiver.java?rev=730924&r1=730923&r2=730924&view=diff
==============================================================================
--- webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageReceiver.java (original)
+++ webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageReceiver.java Fri Jan  2 22:33:39 2009
@@ -15,81 +15,85 @@
 */
 package org.apache.axis2.transport.jms;
 
+import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
-import org.apache.axis2.transport.base.threads.WorkerPool;
 import org.apache.axis2.transport.base.BaseUtils;
 import org.apache.axis2.transport.base.BaseConstants;
 import org.apache.axis2.transport.base.MetricsCollector;
+import org.apache.axis2.transport.jms.ctype.ContentTypeInfo;
 import org.apache.axis2.description.Parameter;
 import org.apache.axis2.description.AxisService;
 import org.apache.axis2.description.AxisOperation;
-import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import javax.jms.*;
 import javax.xml.namespace.QName;
+import javax.transaction.UserTransaction;
 
 /**
- * This is the actual receiver which listens for and accepts JMS messages, and
- * hands them over to be processed by a worker thread. An instance of this
- * class is created for each JMSConnectionFactory, but all instances may and
- * will share the same worker thread pool held by the JMSListener
+ * This is the JMS message receiver which is invoked when a message is received. This processes
+ * the message through the engine
  */
-public class JMSMessageReceiver implements MessageListener {
+public class JMSMessageReceiver {
 
     private static final Log log = LogFactory.getLog(JMSMessageReceiver.class);
 
     /** The JMSListener */
     private JMSListener jmsListener = null;
-    /** The thread pool of workers */
-    private WorkerPool workerPool = null;
-    /** The Axis configuration context */
-    private ConfigurationContext cfgCtx = null;
-    /** A reference to the JMS Connection Factory to which this applies */
+    /** A reference to the JMS Connection Factory */
     private JMSConnectionFactory jmsConnectionFactory = null;
-    /** The name of the service this message receiver is bound to. */
-    final String serviceName;
-    /** Metrics collector */
+    /** The JMS metrics collector */
     private MetricsCollector metrics = null;
+    /** The endpoint this message receiver is bound to */
+    final JMSEndpoint endpoint;
 
     /**
      * Create a new JMSMessage receiver
      *
      * @param jmsListener the JMS transport Listener
-     * @param jmsConFac the JMS connection factory we are associated with
-     * @param workerPool the worker thread pool to be used
-     * @param cfgCtx the axis ConfigurationContext
+     * @param jmsConFac   the JMS connection factory we are associated with
+     * @param workerPool  the worker thread pool to be used
+     * @param cfgCtx      the axis ConfigurationContext
      * @param serviceName the name of the Axis service
+     * @param endpoint    the JMSEndpoint definition to be used
      */
-    JMSMessageReceiver(JMSListener jmsListener, JMSConnectionFactory jmsConFac,
-                       WorkerPool workerPool, ConfigurationContext cfgCtx, String serviceName) {
+    JMSMessageReceiver(JMSListener jmsListener, JMSConnectionFactory jmsConFac, JMSEndpoint endpoint) {
         this.jmsListener = jmsListener;
         this.jmsConnectionFactory = jmsConFac;
-        this.workerPool = workerPool;
-        this.cfgCtx = cfgCtx;
-        this.serviceName = serviceName;
+        this.endpoint = endpoint;
         this.metrics = jmsListener.getMetricsCollector();
     }
 
     /**
-     * The entry point on the reception of each JMS message
+     * Process a new message received
      *
      * @param message the JMS message received
+     * @param ut      UserTransaction which was used to receive the message
+     * @return true if caller should commit
      */
-    public void onMessage(Message message) {
-        // directly create a new worker and delegate processing
+    public boolean onMessage(Message message, UserTransaction ut) {
+
         try {
             if (log.isDebugEnabled()) {
                 StringBuffer sb = new StringBuffer();
-                sb.append("Received JMS message to destination : " + message.getJMSDestination());                
-                sb.append("\nMessage ID : " + message.getJMSMessageID());
-                sb.append("\nCorrelation ID : " + message.getJMSCorrelationID());
-                sb.append("\nReplyTo ID : " + message.getJMSReplyTo());
+                sb.append("Received new JMS message for service :").append(endpoint.getServiceName());
+                sb.append("\nDestination    : ").append(message.getJMSDestination());
+                sb.append("\nMessage ID     : ").append(message.getJMSMessageID());
+                sb.append("\nCorrelation ID : ").append(message.getJMSCorrelationID());
+                sb.append("\nReplyTo        : ").append(message.getJMSReplyTo());
+                sb.append("\nRedelivery ?   : ").append(message.getJMSRedelivered());
+                sb.append("\nPriority       : ").append(message.getJMSPriority());
+                sb.append("\nExpiration     : ").append(message.getJMSExpiration());
+                sb.append("\nTimestamp      : ").append(message.getJMSTimestamp());
+                sb.append("\nMessage Type   : ").append(message.getJMSType());
+                sb.append("\nPersistent ?   : ").append(
+                    DeliveryMode.PERSISTENT == message.getJMSDeliveryMode());
+
                 log.debug(sb.toString());
                 if (log.isTraceEnabled() && message instanceof TextMessage) {
-                    log.trace("\nMessage : " + ((TextMessage) message).getText());    
+                    log.trace("\nMessage : " + ((TextMessage) message).getText());
                 }
             }
         } catch (JMSException e) {
@@ -100,134 +104,130 @@
 
         // update transport level metrics
         try {
-            if (message instanceof BytesMessage) {
-                metrics.incrementBytesReceived((JMSUtils.getBodyLength((BytesMessage) message)));
-            } else if (message instanceof MapMessage) {
-                metrics.incrementBytesReceived((JMSUtils.getBodyLength((MapMessage) message)));
-            } else if (message instanceof TextMessage) {
-                metrics.incrementBytesReceived(((TextMessage) message).getText().getBytes().length);
-            } else {
-                handleException("Unsupported JMS message type : " + message.getClass().getName());
-            }
+            metrics.incrementBytesReceived(JMSUtils.getMessageSize(message));
         } catch (JMSException e) {
             log.warn("Error reading JMS message size to update transport metrics", e);
         }
 
         // has this message already expired? expiration time == 0 means never expires
         try {
-            long expiryTime = message.getJMSExpiration();                        
+            long expiryTime = message.getJMSExpiration();
             if (expiryTime > 0 && System.currentTimeMillis() > expiryTime) {
                 if (log.isDebugEnabled()) {
                     log.debug("Discard expired message with ID : " + message.getJMSMessageID());
                 }
-                return;
+                return true;
             }
         } catch (JMSException ignore) {}
 
-        workerPool.execute(new Worker(message));
-    }
 
-    private void handleException(String msg, Exception e) {
-        log.error(msg, e);
-        throw new AxisJMSException(msg, e);
-    }
+        boolean successful = false;
+        try {
+            successful = processThoughEngine(message, ut);
 
-    private void handleException(String msg) {
-        log.error(msg);
-        throw new AxisJMSException(msg);
-    }
+        } catch (JMSException e) {
+            log.error("JMS Exception encountered while processing", e);
+        } catch (AxisFault e) {
+            log.error("Axis fault processing message", e);
+        } catch (Exception e) {
+            log.error("Unknown error processing message", e);
 
+        } finally {
+            if (successful) {
+                metrics.incrementMessagesReceived();
+            } else {
+                metrics.incrementFaultsReceiving();
+            }
+        }
+
+        return successful;
+    }
 
     /**
-     * The actual Worker implementation which will process the
-     * received JMS messages in the worker thread pool
+     * Process the new message through Axis2
+     *
+     * @param message the JMS message
+     * @param ut      the UserTransaction used for receipt
+     * @return true if the caller should commit
+     * @throws JMSException, on JMS exceptions
+     * @throws AxisFault     on Axis2 errors
      */
-    class Worker implements Runnable {
+    private boolean processThoughEngine(Message message, UserTransaction ut)
+        throws JMSException, AxisFault {
 
-        private Message message = null;
+        MessageContext msgContext = jmsListener.createMessageContext();
 
-        Worker(Message message) {
-            this.message = message;
+        // set the JMS Message ID as the Message ID of the MessageContext
+        try {
+            msgContext.setMessageID(message.getJMSMessageID());
+            msgContext.setProperty(JMSConstants.JMS_COORELATION_ID, message.getJMSMessageID());
+        } catch (JMSException ignore) {}
+
+        String soapAction = JMSUtils.getProperty(message, BaseConstants.SOAPACTION);
+
+        AxisService service = endpoint.getService();
+        msgContext.setAxisService(service);
+
+        // find the operation for the message, or default to one
+        Parameter operationParam = service.getParameter(BaseConstants.OPERATION_PARAM);
+        QName operationQName = (
+            operationParam != null ?
+                BaseUtils.getQNameFromString(operationParam.getValue()) :
+                BaseConstants.DEFAULT_OPERATION);
+
+        AxisOperation operation = service.getOperation(operationQName);
+        if (operation != null) {
+            msgContext.setAxisOperation(operation);
+            msgContext.setSoapAction("urn:" + operation.getName().getLocalPart());
         }
 
-        public void run() {
-
-            MessageContext msgContext = jmsListener.createMessageContext();
-
-            // set the JMS Message ID as the Message ID of the MessageContext
-            try {
-                msgContext.setMessageID(message.getJMSMessageID());
-                msgContext.setProperty(JMSConstants.JMS_COORELATION_ID, message.getJMSMessageID());
-            } catch (JMSException ignore) {}
-
-            AxisService service = null;
-            try {
-                String soapAction = JMSUtils.
-                    getProperty(message, BaseConstants.SOAPACTION);
-
-                // set to bypass dispatching if we know the service - we already should!
-                if (serviceName != null) {
-                    service = cfgCtx.getAxisConfiguration().getService(serviceName);
-                    msgContext.setAxisService(service);
-
-                    // find the operation for the message, or default to one
-                    Parameter operationParam = service.getParameter(BaseConstants.OPERATION_PARAM);
-                    QName operationQName = (
-                        operationParam != null ?
-                            BaseUtils.getQNameFromString(operationParam.getValue()) :
-                            BaseConstants.DEFAULT_OPERATION);
-
-                    AxisOperation operation = service.getOperation(operationQName);
-                    if (operation != null) {
-                        msgContext.setAxisOperation(operation);
-                        msgContext.setSoapAction("urn:" + operation.getName().getLocalPart());
-                    }
-                }
+        ContentTypeInfo contentTypeInfo =
+            endpoint.getContentTypeRuleSet().getContentTypeInfo(message);
+        if (contentTypeInfo == null) {
+            throw new AxisFault("Unable to determine content type for message " +
+                msgContext.getMessageID());
+        }
 
-                // set the message property OUT_TRANSPORT_INFO
-                // the reply is assumed to be over the JMSReplyTo destination, using
-                // the same incoming connection factory, if a JMSReplyTo is available
-                if (message.getJMSReplyTo() != null) {
-                    msgContext.setProperty(
-                        Constants.OUT_TRANSPORT_INFO,
-                        new JMSOutTransportInfo(jmsConnectionFactory, message.getJMSReplyTo()));
-
-                } else if (service != null) {
-                    // does the service specify a default reply destination ?
-                    Parameter param = service.getParameter(JMSConstants.REPLY_PARAM);
-                    if (param != null && param.getValue() != null) {
-                        msgContext.setProperty(
-                            Constants.OUT_TRANSPORT_INFO,
-                            new JMSOutTransportInfo(
-                                jmsConnectionFactory,
-                                jmsConnectionFactory.getDestination((String) param.getValue())));
-                    }
-                }
+        // set the message property OUT_TRANSPORT_INFO
+        // the reply is assumed to be over the JMSReplyTo destination, using
+        // the same incoming connection factory, if a JMSReplyTo is available
+        Destination replyTo = message.getJMSReplyTo();
+        if (replyTo == null) {
+            // does the service specify a default reply destination ?
+            Parameter param = service.getParameter(JMSConstants.PARAM_REPLY_DESTINATION);
+            if (param != null && param.getValue() != null) {
+                replyTo = jmsConnectionFactory.getDestination((String) param.getValue());
+            }
 
-                String contentType = null;
-                if (service != null) {
-                    contentType = (String)service.getParameterValue(JMSConstants.CONTENT_TYPE_PARAM);
-                }
-                if (contentType == null) {
-                    contentType
-                        = JMSUtils.getProperty(message, BaseConstants.CONTENT_TYPE);
-                }
-                
-                JMSUtils.setSOAPEnvelope(message, msgContext, contentType);
+        }
+        if (replyTo != null) {
+            msgContext.setProperty(Constants.OUT_TRANSPORT_INFO,
+                new JMSOutTransportInfo(jmsConnectionFactory, replyTo,
+                    contentTypeInfo.getPropertyName()));
+        }
 
-                jmsListener.handleIncomingMessage(
-                    msgContext,
-                    JMSUtils.getTransportHeaders(message),
-                    soapAction,
-                    contentType
-                );
-                metrics.incrementMessagesReceived();
+        JMSUtils.setSOAPEnvelope(message, msgContext, contentTypeInfo.getContentType());
+        if (ut != null) {
+            msgContext.setProperty(BaseConstants.USER_TRANSACTION, ut);
+        }
 
-            } catch (Throwable e) {
-                metrics.incrementFaultsReceiving();
-                jmsListener.error(service, e);
-                log.error("Exception while processing incoming message", e);
+        try {
+            jmsListener.handleIncomingMessage(
+                msgContext,
+                JMSUtils.getTransportHeaders(message),
+                soapAction,
+                contentTypeInfo.getContentType());
+
+        } finally {
+
+            Object o = msgContext.getProperty(BaseConstants.SET_ROLLBACK_ONLY);
+            if (o != null) {
+                if ((o instanceof Boolean && ((Boolean) o)) ||
+                    (o instanceof String && Boolean.valueOf((String) o))) {
+                    return false;
+                }
             }
+            return true;
         }
     }
 }

Modified: webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java?rev=730924&r1=730923&r2=730924&view=diff
==============================================================================
--- webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java (original)
+++ webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java Fri Jan  2 22:33:39 2009
@@ -16,6 +16,7 @@
 package org.apache.axis2.transport.jms;
 
 import org.apache.axis2.transport.OutTransportInfo;
+import org.apache.axis2.transport.base.BaseUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -40,8 +41,8 @@
     /** The naming context */
     private Context context;
     /**
-     * this is a reference to the underlying JMS connection factory when sending messages
-     * through connection factories not defined to the transport sender
+     * this is a reference to the underlying JMS ConnectionFactory when sending messages
+     * through connection factories not defined at the TransportSender level
      */
     private ConnectionFactory connectionFactory = null;
     /**
@@ -52,31 +53,39 @@
     private JMSConnectionFactory jmsConnectionFactory = null;
     /** the Destination queue or topic for the outgoing message */
     private Destination destination = null;
-    /** the Destination queue or topic for the outgoing message i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC */
-    private String destinationType = JMSConstants.DESTINATION_TYPE_QUEUE;
+    /** the Destination queue or topic for the outgoing message
+     * i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC or DESTINATION_TYPE_GENERIC
+     */
+    private String destinationType = JMSConstants.DESTINATION_TYPE_GENERIC;
     /** the Reply Destination queue or topic for the outgoing message */
     private Destination replyDestination = null;
     /** the Reply Destination name */
     private String replyDestinationName = null;
-    /** the Reply Destination queue or topic for the outgoing message i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC */
-    private String replyDestinationType = JMSConstants.DESTINATION_TYPE_QUEUE;
+    /** the Reply Destination queue or topic for the outgoing message
+     * i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC or DESTINATION_TYPE_GENERIC
+     */
+    private String replyDestinationType = JMSConstants.DESTINATION_TYPE_GENERIC;
     /** the EPR properties when the out-transport info is generated from a target EPR */
     private Hashtable<String,String> properties = null;
     /** the target EPR string where applicable */
     private String targetEPR = null;
-    private String contentType = null;
-
+    /** the message property name that stores the content type of the outgoing message */
+    private String contentTypeProperty;
+    
     /**
      * Creates an instance using the given JMS connection factory and destination
      *
      * @param jmsConnectionFactory the JMS connection factory
      * @param dest the destination
+     * @param contentTypeProperty
      */
-    JMSOutTransportInfo(JMSConnectionFactory jmsConnectionFactory, Destination dest) {
+    JMSOutTransportInfo(JMSConnectionFactory jmsConnectionFactory, Destination dest,
+            String contentTypeProperty) {
         this.jmsConnectionFactory = jmsConnectionFactory;
         this.destination = dest;
         destinationType = dest instanceof Topic ? JMSConstants.DESTINATION_TYPE_TOPIC
                                                 : JMSConstants.DESTINATION_TYPE_QUEUE;
+        this.contentTypeProperty = contentTypeProperty;
     }
 
     /**
@@ -85,28 +94,31 @@
      * @param targetEPR the target EPR
      */
     JMSOutTransportInfo(String targetEPR) {
+
         this.targetEPR = targetEPR;
         if (!targetEPR.startsWith(JMSConstants.JMS_PREFIX)) {
             handleException("Invalid prefix for a JMS EPR : " + targetEPR);
+
         } else {
-            properties = JMSUtils.getProperties(targetEPR);
-            String destinationType = properties.get(JMSConstants.DEST_PARAM_TYPE);
-            if(destinationType != null) {
+            properties = BaseUtils.getEPRProperties(targetEPR);
+            String destinationType = properties.get(JMSConstants.PARAM_DEST_TYPE);
+            if (destinationType != null) {
                 setDestinationType(destinationType);
             }
-            String replyDestinationType = properties.get(JMSConstants.REPLY_PARAM_TYPE);
-            if(replyDestinationType != null) {
+
+            String replyDestinationType = properties.get(JMSConstants.PARAM_REPLY_DEST_TYPE);
+            if (replyDestinationType != null) {
                 setReplyDestinationType(replyDestinationType);
             }
-            String replyDestinationName = properties.get(JMSConstants.REPLY_PARAM);
-            if(replyDestinationName != null) {
-                setReplyDestinationName(replyDestinationName);
-            }
+
+            replyDestinationName = properties.get(JMSConstants.PARAM_REPLY_DESTINATION);
+            contentTypeProperty = properties.get(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM);
             try {
                 context = new InitialContext(properties);
             } catch (NamingException e) {
                 handleException("Could not get an initial context using " + properties, e);
             }
+
             destination = getDestination(context, targetEPR);
             replyDestination = getReplyDestination(context, targetEPR);
         }
@@ -132,7 +144,7 @@
     private ConnectionFactory getConnectionFactory(Context context, Hashtable<String,String> props) {
         try {
 
-            String conFacJndiName = props.get(JMSConstants.CONFAC_JNDI_NAME_PARAM);
+            String conFacJndiName = props.get(JMSConstants.PARAM_CONFAC_JNDI_NAME);
             if (conFacJndiName != null) {
                 return JMSUtils.lookup(context, ConnectionFactory.class, conFacJndiName);
             } else {
@@ -156,8 +168,12 @@
         try {
             return JMSUtils.lookup(context, Destination.class, destinationName);
         } catch (NameNotFoundException e) {
-            if (log.isDebugEnabled()) {
-                log.debug("Cannot locate destination : " + destinationName + " using " + url);
+            try {
+                return JMSUtils.lookup(context, Destination.class,
+                    (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ?
+                        "dynamicTopics/" : "dynamicQueues/") + destinationName);
+            } catch (NamingException x) {
+                handleException("Cannot locate destination : " + destinationName + " using " + url);
             }
         } catch (NamingException e) {
             handleException("Cannot locate destination : " + destinationName + " using " + url, e);
@@ -173,10 +189,11 @@
      * @return the JMS destination, or null if it does not exist
      */
     private Destination getReplyDestination(Context context, String url) {
-        String replyDestinationName = properties.get(JMSConstants.REPLY_PARAM);
+        String replyDestinationName = properties.get(JMSConstants.PARAM_REPLY_DESTINATION);
         if(replyDestinationName == null) {
             return null;
         }
+
         try {
             return JMSUtils.lookup(context, Destination.class, replyDestinationName);
         } catch (NameNotFoundException e) {
@@ -186,13 +203,14 @@
         } catch (NamingException e) {
             handleException("Cannot locate destination : " + replyDestinationName + " using " + url, e);
         }
+
         return null;
     }
 
     /**
      * Look up for the given destination
-     * @param replyDest
-     * @return
+     * @param replyDest the JNDI name to lookup Destination required
+     * @return Destination for the JNDI name passed
      */
     public Destination getReplyDestination(String replyDest) {
         try {
@@ -232,7 +250,7 @@
     }
 
     public void setContentType(String contentType) {
-        this.contentType = contentType;
+        // this is a useless Axis2 method imposed by the OutTransportInfo interface :(
     }
 
     public Hashtable<String,String> getProperties() {
@@ -276,4 +294,12 @@
     public void setReplyDestinationName(String replyDestinationName) {
         this.replyDestinationName = replyDestinationName;
     }
+
+    public String getContentTypeProperty() {
+        return contentTypeProperty;
+    }
+
+    public void setContentTypeProperty(String contentTypeProperty) {
+        this.contentTypeProperty = contentTypeProperty;
+    }
 }

Modified: webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java?rev=730924&r1=730923&r2=730924&view=diff
==============================================================================
--- webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java (original)
+++ webservices/commons/trunk/scratch/senaka/sci-flex/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java Fri Jan  2 22:33:39 2009
@@ -23,6 +23,7 @@
 import org.apache.axiom.om.OMNode;
 import org.apache.axiom.om.ds.MapDataSource;
 import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.description.TransportOutDescription;
@@ -30,20 +31,19 @@
 import org.apache.axis2.transport.MessageFormatter;
 import org.apache.axis2.transport.OutTransportInfo;
 import org.apache.axis2.transport.base.*;
+import org.apache.axis2.transport.base.streams.WriterOutputStream;
 import org.apache.axis2.transport.http.HTTPConstants;
-import org.apache.commons.logging.LogFactory;
 
 import javax.jms.*;
-import javax.jms.Queue;
 import javax.activation.DataHandler;
-import javax.naming.Context;
 import javax.xml.stream.XMLStreamException;
-
 import java.beans.XMLDecoder;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
+import java.io.OutputStream;
+import java.io.StringWriter;
+import java.nio.charset.UnsupportedCharsetException;
 import java.util.*;
 
 /**
@@ -51,34 +51,23 @@
  */
 public class JMSSender extends AbstractTransportSender implements ManagementSupport {
 
-    public static final String TRANSPORT_NAME = "jms";
-    
-    private JMSConnectionFactoryManager connFacManager;
+    public static final String TRANSPORT_NAME = Constants.TRANSPORT_JMS;
 
-    public JMSSender() {
-        log = LogFactory.getLog(JMSSender.class);
-    }
+    /** The JMS connection factory manager to be used when sending messages out */
+    private JMSConnectionFactoryManager connFacManager;
 
     /**
      * Initialize the transport sender by reading pre-defined connection factories for
-     * outgoing messages. These will create sessions (one per each destination dealth with)
-     * to be used when messages are being sent.
+     * outgoing messages.
+     *
      * @param cfgCtx the configuration context
      * @param transportOut the transport sender definition from axis2.xml
      * @throws AxisFault on error
      */
     public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut) throws AxisFault {
         super.init(cfgCtx, transportOut);
-        connFacManager = new JMSConnectionFactoryManager(cfgCtx);
-        // read the connection factory definitions and create them
-        connFacManager.loadConnectionFactoryDefinitions(transportOut);
-        connFacManager.start();
-    }
-
-    @Override
-    public void stop() {
-        connFacManager.stop();
-        super.stop();
+        connFacManager = new JMSConnectionFactoryManager(transportOut);
+        log.info("JMS Transport Sender initialized...");
     }
 
     /**
@@ -90,9 +79,9 @@
      */
     private JMSConnectionFactory getJMSConnectionFactory(JMSOutTransportInfo trpInfo) {
         Map<String,String> props = trpInfo.getProperties();
-        if(trpInfo.getProperties() != null) {
-            String jmsConnectionFactoryName = props.get(JMSConstants.CONFAC_PARAM);
-            if(jmsConnectionFactoryName != null) {
+        if (trpInfo.getProperties() != null) {
+            String jmsConnectionFactoryName = props.get(JMSConstants.PARAM_JMS_CONFAC);
+            if (jmsConnectionFactoryName != null) {
                 return connFacManager.getJMSConnectionFactory(jmsConnectionFactoryName);
             } else {
                 return connFacManager.getJMSConnectionFactory(props);
@@ -109,84 +98,85 @@
         OutTransportInfo outTransportInfo) throws AxisFault {
 
         JMSConnectionFactory jmsConnectionFactory = null;
-        Connection connection = null;   // holds a one time connection if used
         JMSOutTransportInfo jmsOut = null;
-        Session session = null;
-        Destination replyDestination = null;
+        JMSMessageSender messageSender = null;
 
-        try {
-            if (targetAddress != null) {
+        if (targetAddress != null) {
 
-                jmsOut = new JMSOutTransportInfo(targetAddress);
-                // do we have a definition for a connection factory to use for this address?
-                jmsConnectionFactory = getJMSConnectionFactory(jmsOut);
+            jmsOut = new JMSOutTransportInfo(targetAddress);
+            // do we have a definition for a connection factory to use for this address?
+            jmsConnectionFactory = getJMSConnectionFactory(jmsOut);
+            
+            if (jmsConnectionFactory != null) {
+                messageSender = new JMSMessageSender(jmsConnectionFactory, targetAddress);
 
-                if (jmsConnectionFactory != null) {
-                    // create new or get existing session to send to the destination from the CF
-                    session = jmsConnectionFactory.getSessionForDestination(
-                        JMSUtils.getDestination(targetAddress));
+            } else {
+                try {
+                    messageSender = JMSUtils.createJMSSender(jmsOut);
+                } catch (JMSException e) {
+                    handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e);
+                }
+            }
 
-                } else {
-                    // digest the targetAddress and locate CF from the EPR
-                    jmsOut.loadConnectionFactoryFromProperies();
-                    try {
-                        // create a one time connection and session to be used
-                        Hashtable<String,String> jndiProps = jmsOut.getProperties();
-                        String user = jndiProps.get(Context.SECURITY_PRINCIPAL);
-                        String pass = jndiProps.get(Context.SECURITY_CREDENTIALS);
-
-                        QueueConnectionFactory qConFac = null;
-                        TopicConnectionFactory tConFac = null;
-
-                        if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(jmsOut.getDestinationType())) {
-                            qConFac = (QueueConnectionFactory) jmsOut.getConnectionFactory();
-                        } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(jmsOut.getDestinationType())) {
-                            tConFac = (TopicConnectionFactory) jmsOut.getConnectionFactory();
-                        } else {
-                            handleException("Unable to determine type of JMS " +
-                                "Connection Factory - i.e Queue/Topic");
-                        }
+        } else if (outTransportInfo != null && outTransportInfo instanceof JMSOutTransportInfo) {
 
-                        if (user != null && pass != null) {
-                            if (qConFac != null) {
-                                connection = qConFac.createQueueConnection(user, pass);
-                            } else if (tConFac != null) {
-                                connection = tConFac.createTopicConnection(user, pass);
-                            }
-                        } else {
-                           if (qConFac != null) {
-                                connection = qConFac.createQueueConnection();
-                            } else if (tConFac != null) {
-                                connection = tConFac.createTopicConnection();
-                            }
-                        }
+            jmsOut = (JMSOutTransportInfo) outTransportInfo;
+            try {
+                messageSender = JMSUtils.createJMSSender(jmsOut);
+            } catch (JMSException e) {
+                handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e);
+            }
+        }
 
-                        if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(jmsOut.getDestinationType())) {
-                            session = ((QueueConnection)connection).
-                                createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
-                        } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(jmsOut.getDestinationType())) {
-                            session = ((TopicConnection)connection).
-                                createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
-                        }
+        // The message property to be used to send the content type is determined by
+        // the out transport info, i.e. either from the EPR if we are sending a request,
+        // or, if we are sending a response, from the configuration of the service that
+        // received the request). The property name can be overridden by a message
+        // context property.
+        String contentTypeProperty =
+            (String) msgCtx.getProperty(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM);
+        if (contentTypeProperty == null) {
+            contentTypeProperty = jmsOut.getContentTypeProperty();
+        }
 
-                    } catch (JMSException e) {
-                        handleException("Error creating a connection/session for : " + targetAddress, e);
-                    }
-                }
-                replyDestination = jmsOut.getReplyDestination();
+        // need to synchronize as Sessions are not thread safe
+        synchronized (messageSender.getSession()) {
+            try {
+                sendOverJMS(msgCtx, messageSender, contentTypeProperty, jmsConnectionFactory, jmsOut);
+            } finally {
+                messageSender.close();
+            }
+        }
+    }
 
-            } else if (outTransportInfo != null && outTransportInfo instanceof JMSOutTransportInfo) {
+    /**
+     * Perform actual sending of the JMS message
+     */
+    private void sendOverJMS(MessageContext msgCtx, JMSMessageSender messageSender,
+        String contentTypeProperty, JMSConnectionFactory jmsConnectionFactory,
+        JMSOutTransportInfo jmsOut) throws AxisFault {
+        
+        // convert the axis message context into a JMS Message that we can send over JMS
+        Message message = null;
+        String correlationId = null;
+        try {
+            message = createJMSMessage(msgCtx, messageSender.getSession(), contentTypeProperty);
+        } catch (JMSException e) {
+            handleException("Error creating a JMS message from the message context", e);
+        }
 
-                jmsOut = (JMSOutTransportInfo) outTransportInfo;
-                jmsConnectionFactory = jmsOut.getJmsConnectionFactory();
+        // should we wait for a synchronous response on this same thread?
+        boolean waitForResponse = waitForSynchronousResponse(msgCtx);
+        Destination replyDestination = jmsOut.getReplyDestination();
 
-                session = jmsConnectionFactory.getSessionForDestination(
-                    jmsOut.getDestination().toString());
-            }
-            
-            Destination destination = jmsOut.getDestination();
+        // if this is a synchronous out-in, prepare to listen on the response destination
+        if (waitForResponse) {
 
             String replyDestName = (String) msgCtx.getProperty(JMSConstants.JMS_REPLY_TO);
+            if (replyDestName == null && jmsConnectionFactory != null) {
+                replyDestName = jmsConnectionFactory.getReplyToDestination();
+            }
+
             if (replyDestName != null) {
                 if (jmsConnectionFactory != null) {
                     replyDestination = jmsConnectionFactory.getDestination(replyDestName);
@@ -194,103 +184,45 @@
                     replyDestination = jmsOut.getReplyDestination(replyDestName);
                 }
             }
+            replyDestination = JMSUtils.setReplyDestination(
+                replyDestination, messageSender.getSession(), message);
+        }
 
-            if(session == null) {
-               handleException("Could not create JMS session");
-            }
-
-            // now we are going to use the JMS session, but if this was a session from a
-            // defined JMS connection factory, we need to synchronize as sessions are not
-            // thread safe
-            synchronized(session) {
-
-                // convert the axis message context into a JMS Message that we can send over JMS
-                Message message = null;
-                String correlationId = null;
-                try {
-                    message = createJMSMessage(msgCtx, session);
-                } catch (JMSException e) {
-                    handleException("Error creating a JMS message from the axis message context", e);
-                }
-
-                String destinationType = jmsOut.getDestinationType();
-
-                // if the destination does not exist, see if we can create it
-                destination = JMSUtils.createDestinationIfRequired(
-                    destination, destinationType, targetAddress, session);
-
-                if(jmsOut.getReplyDestinationName() != null) {
-                    replyDestination = JMSUtils.createReplyDestinationIfRequired(
-                        replyDestination, jmsOut.getReplyDestinationName(),
-                        jmsOut.getReplyDestinationType(), targetAddress, session);
-                }
-
-                // should we wait for a synchronous response on this same thread?
-                boolean waitForResponse = waitForSynchronousResponse(msgCtx);
-
-                // if this is a synchronous out-in, prepare to listen on the response destination
-                if (waitForResponse) {
-                    replyDestination = JMSUtils.setReplyDestination(
-                        replyDestination, session, message);
-                }
+        try {
+            messageSender.send(message, msgCtx);
+            metrics.incrementMessagesSent(msgCtx);
 
-                // send the outgoing message over JMS to the destination selected
-                try {
-                    JMSUtils.sendMessageToJMSDestination(session, destination, destinationType, message);
+        } catch (AxisJMSException e) {
+            metrics.incrementFaultsSending();
+            handleException("Error sending JMS message", e);
+        }
 
-                    // set the actual MessageID to the message context for use by any others
-                    try {
-                        String msgId = message.getJMSMessageID();
-                        if (msgId != null) {
-                            msgCtx.setProperty(JMSConstants.JMS_MESSAGE_ID, msgId);
-                        }
-                    } catch (JMSException ignore) {}
+        try {
+            metrics.incrementBytesSent(msgCtx, JMSUtils.getMessageSize(message));
+        } catch (JMSException e) {
+            log.warn("Error reading JMS message size to update transport metrics", e);
+        }
 
-                    metrics.incrementMessagesSent();
-                    try {
-                        if (message instanceof BytesMessage) {
-                            metrics.incrementBytesSent(JMSUtils.getBodyLength((BytesMessage) message));
-                        } else if (message instanceof MapMessage) {
-                            metrics.incrementBytesSent((JMSUtils.getBodyLength((MapMessage) message)));
-                        } else if (message instanceof TextMessage) {
-                            metrics.incrementBytesSent((
-                                (TextMessage) message).getText().getBytes().length);
-                        } else {
-                            handleException("Unsupported JMS message type : " +
-                                message.getClass().getName());
-                        }
-                    } catch (JMSException e) {
-                        log.warn("Error reading JMS message size to update transport metrics", e);
-                    }
-                } catch (BaseTransportException e) {
-                    metrics.incrementFaultsSending();
-                    throw e;
-                }
+        // if we are expecting a synchronous response back for the message sent out
+        if (waitForResponse) {
+            // TODO ********************************************************************************
+            // TODO **** replace with asynchronous polling via a poller task to process this *******
+            // information would be given. Then it should poll (until timeout) the
+            // requested destination for the response message and inject it from a
+            // asynchronous worker thread
+            try {
+                messageSender.getConnection().start();  // multiple calls are safely ignored
+            } catch (JMSException ignore) {}
 
-                // if we are expecting a synchronous response back for the message sent out
-                if (waitForResponse) {
-                    if (connection != null) {
-                        try {
-                            connection.start();
-                        } catch (JMSException ignore) {}
-                    } else {
-                        // If connection is null, we are using a cached session and the underlying
-                        // connection is already started. Thus, there is nothing to do here.
-                    }
-                    try {
-                        correlationId = message.getJMSMessageID();
-                    } catch(JMSException ignore) {}
-                        waitForResponseAndProcess(session, replyDestination,
-                                jmsOut.getReplyDestinationType(), msgCtx, correlationId);
-                }
-            }
+            try {
+                correlationId = message.getJMSMessageID();
+            } catch(JMSException ignore) {}
 
-        } finally {
-            if (connection != null) {
-                try {
-                    connection.close();
-                } catch (JMSException ignore) {}
-            }
+            // We assume here that the response uses the same message property to
+            // specify the content type of the message.
+            waitForResponseAndProcess(messageSender.getSession(), replyDestination,
+                msgCtx, correlationId, contentTypeProperty);
+            // TODO ********************************************************************************
         }
     }
 
@@ -301,28 +233,18 @@
      * @param session the session to use to listen for the response
      * @param replyDestination the JMS reply Destination
      * @param msgCtx the outgoing message for which we are expecting the response
+     * @param contentTypeProperty the message property used to determine the content type
+     *                            of the response message
      * @throws AxisFault on error
      */
     private void waitForResponseAndProcess(Session session, Destination replyDestination,
-        String replyDestinationType, MessageContext msgCtx, String correlationId) throws AxisFault {
+            MessageContext msgCtx, String correlationId,
+            String contentTypeProperty) throws AxisFault {
 
         try {
-            MessageConsumer consumer = null;
-            if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(replyDestinationType)) {
-                if (correlationId != null) {
-                    consumer = ((QueueSession) session).createReceiver((Queue) replyDestination,
-                        "JMSCorrelationID = '" + correlationId + "'");
-                } else {
-                    consumer = ((QueueSession) session).createReceiver((Queue) replyDestination);
-                }
-            } else {
-                if (correlationId != null) {
-                    consumer = ((TopicSession) session).createSubscriber((Topic) replyDestination,
-                        "JMSCorrelationID = '" + correlationId + "'", false);
-                } else {
-                    consumer = ((TopicSession) session).createSubscriber((Topic) replyDestination);
-                }
-            }
+            MessageConsumer consumer;
+            consumer = JMSUtils.createConsumer(session, replyDestination,
+                "JMSCorrelationID = '" + correlationId + "'");
 
             // how long are we willing to wait for the sync response
             long timeout = JMSConstants.DEFAULT_JMS_TIMEOUT;
@@ -344,23 +266,13 @@
                 // update transport level metrics
                 metrics.incrementMessagesReceived();                
                 try {
-                    if (reply instanceof BytesMessage) {
-                        metrics.incrementBytesReceived(JMSUtils.getBodyLength((BytesMessage) reply));
-                    } else if (reply instanceof MapMessage) {
-                        metrics.incrementBytesReceived((JMSUtils.getBodyLength((MapMessage) reply)));
-                    } else if (reply instanceof TextMessage) {
-                        metrics.incrementBytesReceived((
-                            (TextMessage) reply).getText().getBytes().length);
-                    } else {
-                        handleException("Unsupported JMS message type : " +
-                            reply.getClass().getName());
-                    }
+                    metrics.incrementBytesReceived(JMSUtils.getMessageSize(reply));
                 } catch (JMSException e) {
                     log.warn("Error reading JMS message size to update transport metrics", e);
                 }
 
                 try {
-                    processSyncResponse(msgCtx, reply);
+                    processSyncResponse(msgCtx, reply, contentTypeProperty);
                     metrics.incrementMessagesReceived();
                 } catch (AxisFault e) {
                     metrics.incrementFaultsReceiving();
@@ -376,8 +288,9 @@
 
         } catch (JMSException e) {
             metrics.incrementFaultsReceiving();
-            handleException("Error creating consumer or receiving reply to : " +
-                replyDestination, e);
+            handleException("Error creating a consumer, or receiving a synchronous reply " +
+                "for outgoing MessageContext ID : " + msgCtx.getMessageID() +
+                " and reply Destination : " + replyDestination, e);
         }
     }
 
@@ -387,12 +300,14 @@
      *
      * @param msgContext the MessageContext
      * @param session    the JMS session
+     * @param contentTypeProperty the message property to be used to store the
+     *                            content type
      * @return a JMS message from the context and session
      * @throws JMSException on exception
      * @throws AxisFault on exception
      */
-    private Message createJMSMessage(MessageContext msgContext, Session session)
-            throws JMSException, AxisFault {
+    private Message createJMSMessage(MessageContext msgContext, Session session,
+            String contentTypeProperty) throws JMSException, AxisFault {
 
         Message message = null;
         String msgType = getProperty(msgContext, JMSConstants.JMS_MESSAGE_TYPE);
@@ -416,20 +331,7 @@
             String contentType = messageFormatter.getContentType(
                 msgContext, format, msgContext.getSoapAction());
 
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            try {
-                messageFormatter.writeTo(msgContext, format, baos, true);
-                baos.flush();
-            } catch (IOException e) {
-                handleException("IO Error while creating BytesMessage", e);
-            }
-
-            if (msgType != null && JMSConstants.JMS_BYTE_MESSAGE.equals(msgType) ||
-                contentType.indexOf(HTTPConstants.HEADER_ACCEPT_MULTIPART_RELATED) > -1) {
-                message = session.createBytesMessage();
-                BytesMessage bytesMsg = (BytesMessage) message;
-                bytesMsg.writeBytes(baos.toByteArray());
-            } else if (msgType != null && JMSConstants.JMS_MAP_MESSAGE.equals(msgType)) {
+            if (msgType != null && JMSConstants.JMS_MAP_MESSAGE.equals(msgType)) {
                 OMElement wrapper = msgContext.getEnvelope().getBody().getFirstElement();
                 if (wrapper != null && wrapper instanceof OMSourcedElement) {
                     OMSourcedElement omNode = (OMSourcedElement) wrapper;
@@ -499,15 +401,43 @@
                     }
                 }
             } else {
-                message = session.createTextMessage();  // default
-                TextMessage txtMsg = (TextMessage) message;
+                boolean useBytesMessage =
+                    msgType != null && JMSConstants.JMS_BYTE_MESSAGE.equals(msgType) ||
+                        contentType.indexOf(HTTPConstants.HEADER_ACCEPT_MULTIPART_RELATED) > -1;
+
+                OutputStream out;
+                StringWriter sw;
+                if (useBytesMessage) {
+                    BytesMessage bytesMsg = session.createBytesMessage();
+                    sw = null;
+                    out = new BytesMessageOutputStream(bytesMsg);
+                    message = bytesMsg;
+                } else {
+                sw = new StringWriter();
+                    try {
+                        out = new WriterOutputStream(sw, format.getCharSetEncoding());
+                    } catch (UnsupportedCharsetException ex) {
+                        handleException("Unsupported encoding " + format.getCharSetEncoding(), ex);
+                        return null;
+                    }
+                }
+            
                 try {
-                    txtMsg.setText(new String(baos.toByteArray(), format.getCharSetEncoding()));
-                } catch (UnsupportedEncodingException ex) {
-                    handleException("Unsupported encoding " + format.getCharSetEncoding(), ex);
+                    messageFormatter.writeTo(msgContext, format, out, true);
+                    out.close();
+                } catch (IOException e) {
+                    handleException("IO Error while creating BytesMessage", e);
                 }
+
+                if (!useBytesMessage) {
+                    TextMessage txtMsg = session.createTextMessage();
+                    txtMsg.setText(sw.toString());
+                    message = txtMsg;
+                }
+            }        
+            if (contentTypeProperty != null) {
+                message.setStringProperty(contentTypeProperty, contentType);
             }
-            message.setStringProperty(BaseConstants.CONTENT_TYPE, contentType);
 
         } else if (JMSConstants.JMS_BYTE_MESSAGE.equals(jmsPayloadType)) {
             message = session.createBytesMessage();
@@ -518,14 +448,12 @@
             if (omNode != null && omNode instanceof OMText) {
                 Object dh = ((OMText) omNode).getDataHandler();
                 if (dh != null && dh instanceof DataHandler) {
-                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
                     try {
-                        ((DataHandler) dh).writeTo(baos);
+                        ((DataHandler) dh).writeTo(new BytesMessageOutputStream(bytesMsg));
                     } catch (IOException e) {
                         handleException("Error serializing binary content of element : " +
                             BaseConstants.DEFAULT_BINARY_WRAPPER, e);
                     }
-                    bytesMsg.writeBytes(baos.toByteArray());
                 }
             }
 
@@ -534,6 +462,7 @@
             TextMessage txtMsg = (TextMessage) message;
             txtMsg.setText(msgContext.getEnvelope().getBody().
                 getFirstChildWithName(BaseConstants.DEFAULT_TEXT_WRAPPER).getText());
+
         } else if (JMSConstants.JMS_MAP_MESSAGE.equals(jmsPayloadType)) {
             OMElement wrapper = msgContext.getEnvelope().getBody().
                 getFirstChildWithName(BaseConstants.DEFAULT_MAP_WRAPPER);
@@ -656,9 +585,12 @@
      *
      * @param outMsgCtx the outgoing message for which we are expecting the response
      * @param message the JMS response message received
+     * @param contentTypeProperty the message property used to determine the content type
+     *                            of the response message
      * @throws AxisFault on error
      */
-    private void processSyncResponse(MessageContext outMsgCtx, Message message) throws AxisFault {
+    private void processSyncResponse(MessageContext outMsgCtx, Message message,
+            String contentTypeProperty) throws AxisFault {
 
         MessageContext responseMsgCtx = createResponseMessageContext(outMsgCtx);
 
@@ -671,7 +603,9 @@
         // workaround as Axis2 1.2 is about to be released and Synapse 1.0
         responseMsgCtx.setServerSide(false);
 
-        String contentType = JMSUtils.getProperty(message, BaseConstants.CONTENT_TYPE);
+        String contentType =
+                contentTypeProperty == null ? null
+                        : JMSUtils.getProperty(message, contentTypeProperty);
 
         try {
             JMSUtils.setSOAPEnvelope(message, responseMsgCtx, contentType);