You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commons-dev@ws.apache.org by as...@apache.org on 2008/11/15 17:11:12 UTC

svn commit: r717873 [2/4] - in /webservices/commons/trunk/scratch/asankha/transport/modules: base/src/main/java/org/apache/axis2/transport/base/ jms/ jms/src/main/java/org/apache/axis2/transport/jms/ tests/ tests/src/test/java/org/apache/axis2/transpor...

Modified: webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageReceiver.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageReceiver.java?rev=717873&r1=717872&r2=717873&view=diff
==============================================================================
--- webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageReceiver.java (original)
+++ webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageReceiver.java Sat Nov 15 08:11:11 2008
@@ -17,7 +17,6 @@
 
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
-import org.apache.axis2.transport.base.threads.WorkerPool;
 import org.apache.axis2.transport.base.BaseUtils;
 import org.apache.axis2.transport.base.BaseConstants;
 import org.apache.axis2.transport.base.MetricsCollector;
@@ -25,73 +24,76 @@
 import org.apache.axis2.description.Parameter;
 import org.apache.axis2.description.AxisService;
 import org.apache.axis2.description.AxisOperation;
-import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import javax.jms.*;
 import javax.xml.namespace.QName;
+import javax.transaction.UserTransaction;
 
 /**
- * This is the actual receiver which listens for and accepts JMS messages, and
- * hands them over to be processed by a worker thread. An instance of this
- * class is created for each JMSConnectionFactory, but all instances may and
- * will share the same worker thread pool held by the JMSListener
+ * This is the JMS message receiver which is invoked when a message is received. This processes
+ * the message through the engine
  */
-public class JMSMessageReceiver implements MessageListener {
+public class JMSMessageReceiver {
 
     private static final Log log = LogFactory.getLog(JMSMessageReceiver.class);
 
     /** The JMSListener */
     private JMSListener jmsListener = null;
-    /** The thread pool of workers */
-    private WorkerPool workerPool = null;
-    /** The Axis configuration context */
-    private ConfigurationContext cfgCtx = null;
-    /** A reference to the JMS Connection Factory to which this applies */
+    /** A reference to the JMS Connection Factory */
     private JMSConnectionFactory jmsConnectionFactory = null;
-    /** The endpoint this message receiver is bound to. */
-    final JMSEndpoint endpoint;
-    /** Metrics collector */
+    /** The JMS metrics collector */
     private MetricsCollector metrics = null;
+    /** The endpoint this message receiver is bound to */
+    final JMSEndpoint endpoint;
 
     /**
      * Create a new JMSMessage receiver
      *
      * @param jmsListener the JMS transport Listener
-     * @param jmsConFac the JMS connection factory we are associated with
-     * @param workerPool the worker thread pool to be used
-     * @param cfgCtx the axis ConfigurationContext
+     * @param jmsConFac   the JMS connection factory we are associated with
+     * @param workerPool  the worker thread pool to be used
+     * @param cfgCtx      the axis ConfigurationContext
      * @param serviceName the name of the Axis service
+     * @param endpoint    the JMSEndpoint definition to be used
      */
-    JMSMessageReceiver(JMSListener jmsListener, JMSConnectionFactory jmsConFac,
-                       WorkerPool workerPool, ConfigurationContext cfgCtx, JMSEndpoint endpoint) {
+    JMSMessageReceiver(JMSListener jmsListener, JMSConnectionFactory jmsConFac, JMSEndpoint endpoint) {
         this.jmsListener = jmsListener;
         this.jmsConnectionFactory = jmsConFac;
-        this.workerPool = workerPool;
-        this.cfgCtx = cfgCtx;
         this.endpoint = endpoint;
         this.metrics = jmsListener.getMetricsCollector();
     }
 
     /**
-     * The entry point on the reception of each JMS message
+     * Process a new message received
      *
      * @param message the JMS message received
+     * @param ut      UserTransaction which was used to receive the message
+     * @return true if caller should commit
      */
-    public void onMessage(Message message) {
-        // directly create a new worker and delegate processing
+    public boolean onMessage(Message message, UserTransaction ut) {
+
         try {
             if (log.isDebugEnabled()) {
                 StringBuffer sb = new StringBuffer();
-                sb.append("Received JMS message to destination : " + message.getJMSDestination());                
-                sb.append("\nMessage ID : " + message.getJMSMessageID());
-                sb.append("\nCorrelation ID : " + message.getJMSCorrelationID());
-                sb.append("\nReplyTo ID : " + message.getJMSReplyTo());
+                sb.append("Received new JMS message for service :").append(endpoint.getServiceName());
+                sb.append("\nDestination    : ").append(message.getJMSDestination());
+                sb.append("\nMessage ID     : ").append(message.getJMSMessageID());
+                sb.append("\nCorrelation ID : ").append(message.getJMSCorrelationID());
+                sb.append("\nReplyTo        : ").append(message.getJMSReplyTo());
+                sb.append("\nRedelivery ?   : ").append(message.getJMSRedelivered());
+                sb.append("\nPriority       : ").append(message.getJMSPriority());
+                sb.append("\nExpiration     : ").append(message.getJMSExpiration());
+                sb.append("\nTimestamp      : ").append(message.getJMSTimestamp());
+                sb.append("\nMessage Type   : ").append(message.getJMSType());
+                sb.append("\nPersistent ?   : ").append(
+                    DeliveryMode.PERSISTENT == message.getJMSDeliveryMode());
+
                 log.debug(sb.toString());
                 if (log.isTraceEnabled() && message instanceof TextMessage) {
-                    log.trace("\nMessage : " + ((TextMessage) message).getText());    
+                    log.trace("\nMessage : " + ((TextMessage) message).getText());
                 }
             }
         } catch (JMSException e) {
@@ -109,112 +111,123 @@
 
         // has this message already expired? expiration time == 0 means never expires
         try {
-            long expiryTime = message.getJMSExpiration();                        
+            long expiryTime = message.getJMSExpiration();
             if (expiryTime > 0 && System.currentTimeMillis() > expiryTime) {
                 if (log.isDebugEnabled()) {
                     log.debug("Discard expired message with ID : " + message.getJMSMessageID());
                 }
-                return;
+                return true;
             }
         } catch (JMSException ignore) {}
 
-        workerPool.execute(new Worker(message));
-    }
 
-    private void handleException(String msg, Exception e) {
-        log.error(msg, e);
-        throw new AxisJMSException(msg, e);
-    }
+        boolean successful = false;
+        try {
+            successful = processThoughEngine(message, ut);
 
-    private void handleException(String msg) {
-        log.error(msg);
-        throw new AxisJMSException(msg);
-    }
+        } catch (JMSException e) {
+            log.error("JMS Exception encountered while processing", e);
+        } catch (AxisFault e) {
+            log.error("Axis fault processing message", e);
+        } catch (Exception e) {
+            log.error("Unknown error processing message", e);
 
+        } finally {
+            if (successful) {
+                metrics.incrementMessagesReceived();
+            } else {
+                metrics.incrementFaultsReceiving();
+            }
+        }
+
+        return successful;
+    }
 
     /**
-     * The actual Worker implementation which will process the
-     * received JMS messages in the worker thread pool
+     * Process the new message through Axis2
+     *
+     * @param message the JMS message
+     * @param ut      the UserTransaction used for receipt
+     * @return true if the caller should commit
+     * @throws JMSException, on JMS exceptions
+     * @throws AxisFault     on Axis2 errors
      */
-    class Worker implements Runnable {
+    private boolean processThoughEngine(Message message, UserTransaction ut)
+        throws JMSException, AxisFault {
 
-        private Message message = null;
+        MessageContext msgContext = jmsListener.createMessageContext();
 
-        Worker(Message message) {
-            this.message = message;
-        }
+        // set the JMS Message ID as the Message ID of the MessageContext
+        try {
+            msgContext.setMessageID(message.getJMSMessageID());
+            msgContext.setProperty(JMSConstants.JMS_COORELATION_ID, message.getJMSMessageID());
+        } catch (JMSException ignore) {}
 
-        public void run() {
+        String soapAction = JMSUtils.getProperty(message, BaseConstants.SOAPACTION);
 
-            MessageContext msgContext = jmsListener.createMessageContext();
+        AxisService service = endpoint.getService();
+        msgContext.setAxisService(service);
 
-            // set the JMS Message ID as the Message ID of the MessageContext
-            try {
-                msgContext.setMessageID(message.getJMSMessageID());
-                msgContext.setProperty(JMSConstants.JMS_COORELATION_ID, message.getJMSMessageID());
-            } catch (JMSException ignore) {}
+        // find the operation for the message, or default to one
+        Parameter operationParam = service.getParameter(BaseConstants.OPERATION_PARAM);
+        QName operationQName = (
+            operationParam != null ?
+                BaseUtils.getQNameFromString(operationParam.getValue()) :
+                BaseConstants.DEFAULT_OPERATION);
+
+        AxisOperation operation = service.getOperation(operationQName);
+        if (operation != null) {
+            msgContext.setAxisOperation(operation);
+            msgContext.setSoapAction("urn:" + operation.getName().getLocalPart());
+        }
 
-            AxisService service = null;
-            try {
-                String soapAction = JMSUtils.
-                    getProperty(message, BaseConstants.SOAPACTION);
+        ContentTypeInfo contentTypeInfo =
+            endpoint.getContentTypeRuleSet().getContentTypeInfo(message);
+        if (contentTypeInfo == null) {
+            throw new AxisFault("Unable to determine content type for message " +
+                msgContext.getMessageID());
+        }
 
-                service = endpoint.getService();
-                msgContext.setAxisService(service);
+        // set the message property OUT_TRANSPORT_INFO
+        // the reply is assumed to be over the JMSReplyTo destination, using
+        // the same incoming connection factory, if a JMSReplyTo is available
+        Destination replyTo = message.getJMSReplyTo();
+        if (replyTo == null) {
+            // does the service specify a default reply destination ?
+            Parameter param = service.getParameter(JMSConstants.PARAM_REPLY_DESTINATION);
+            if (param != null && param.getValue() != null) {
+                replyTo = jmsConnectionFactory.getDestination((String) param.getValue());
+            }
 
-                // find the operation for the message, or default to one
-                Parameter operationParam = service.getParameter(BaseConstants.OPERATION_PARAM);
-                QName operationQName = (
-                    operationParam != null ?
-                        BaseUtils.getQNameFromString(operationParam.getValue()) :
-                        BaseConstants.DEFAULT_OPERATION);
+        }
+        if (replyTo != null) {
+            msgContext.setProperty(Constants.OUT_TRANSPORT_INFO,
+                new JMSOutTransportInfo(jmsConnectionFactory, replyTo,
+                    contentTypeInfo.getPropertyName()));
+        }
 
-                AxisOperation operation = service.getOperation(operationQName);
-                if (operation != null) {
-                    msgContext.setAxisOperation(operation);
-                    msgContext.setSoapAction("urn:" + operation.getName().getLocalPart());
-                }
+        JMSUtils.setSOAPEnvelope(message, msgContext, contentTypeInfo.getContentType());
+        if (ut != null) {
+            msgContext.setProperty(BaseConstants.USER_TRANSACTION, ut);
+        }
 
-                ContentTypeInfo contentTypeInfo =
-                        endpoint.getContentTypeRuleSet().getContentTypeInfo(message);
-                if (contentTypeInfo == null) {
-                    throw new AxisFault("Unable to determine content type for message " +
-                            msgContext.getMessageID());
-                }
-                
-                // set the message property OUT_TRANSPORT_INFO
-                // the reply is assumed to be over the JMSReplyTo destination, using
-                // the same incoming connection factory, if a JMSReplyTo is available
-                Destination replyTo = message.getJMSReplyTo();
-                if (replyTo == null) {
-                    // does the service specify a default reply destination ?
-                    Parameter param = service.getParameter(JMSConstants.REPLY_PARAM);
-                    if (param != null && param.getValue() != null) {
-                        replyTo = jmsConnectionFactory.getDestination((String) param.getValue());
-                    }
-                    
-                }
-                if (replyTo != null) {
-                    msgContext.setProperty(Constants.OUT_TRANSPORT_INFO,
-                            new JMSOutTransportInfo(jmsConnectionFactory, replyTo,
-                                                    contentTypeInfo.getPropertyName()));
+        try {
+            jmsListener.handleIncomingMessage(
+                msgContext,
+                JMSUtils.getTransportHeaders(message),
+                soapAction,
+                contentTypeInfo.getContentType());
+
+        } finally {
+
+            Object o = msgContext.getProperty(BaseConstants.SET_ROLLBACK_ONLY);
+            if (o != null) {
+                if ((o instanceof Boolean && ((Boolean) o)) ||
+                    (o instanceof String && Boolean.valueOf((String) o))) {
+                    return false;
                 }
-
-                JMSUtils.setSOAPEnvelope(message, msgContext, contentTypeInfo.getContentType());
-
-                jmsListener.handleIncomingMessage(
-                    msgContext,
-                    JMSUtils.getTransportHeaders(message),
-                    soapAction,
-                    contentTypeInfo.getContentType()
-                );
-                metrics.incrementMessagesReceived();
-
-            } catch (Throwable e) {
-                metrics.incrementFaultsReceiving();
-                jmsListener.error(service, e);
-                log.error("Exception while processing incoming message", e);
             }
+            return true;
         }
     }
 }

Added: webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java?rev=717873&view=auto
==============================================================================
--- webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java (added)
+++ webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java Sat Nov 15 08:11:11 2008
@@ -0,0 +1,318 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.axis2.transport.jms;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.transport.base.BaseConstants;
+
+import javax.jms.*;
+import javax.transaction.*;
+
+/**
+ * Performs the actual sending of a JMS message, and the subsequent committing of a JTA transaction
+ * (if requested) or the local session transaction, if used. An instance of this class is unique
+ * to a single message send out operation and will not be shared.
+ */
+public class JMSMessageSender {
+
+    private static final Log log = LogFactory.getLog(JMSMessageSender.class);
+
+    /** The Connection to be used to send out */
+    private Connection connection = null;
+    /** The Session to be used to send out */
+    private Session session = null;
+    /** The MessageProducer used */
+    private MessageProducer producer = null;
+    /** Target Destination */
+    private Destination destination = null;
+    /** The level of cachability for resources */
+    private int cacheLevel = JMSConstants.CACHE_CONNECTION;
+    /** Should this sender use JMS 1.1 ? (if false, defaults to 1.0.2b) */
+    private boolean jmsSpec11 = true;
+    /** Are we sending to a Queue ? */
+    private Boolean isQueue = null;
+
+    /**
+     * This is a low-end method to support the one-time sends using JMS 1.0.2b
+     * @param connection the JMS Connection
+     * @param session JMS Session
+     * @param producer the MessageProducer
+     * @param destination the JMS Destination
+     * @param cacheLevel cacheLevel - None | Connection | Session | Producer
+     * @param jmsSpec11 true if the JMS 1.1 API should be used
+     * @param isQueue posting to a Queue?
+     */
+    public JMSMessageSender(Connection connection, Session session, MessageProducer producer,
+        Destination destination, int cacheLevel, boolean jmsSpec11, Boolean isQueue) {
+
+        this.connection = connection;
+        this.session = session;
+        this.producer = producer;
+        this.destination = destination;
+        this.cacheLevel = cacheLevel;
+        this.jmsSpec11 = jmsSpec11;
+        this.isQueue = isQueue;
+    }
+
+    /**
+     * Create a JMSSender using a JMSConnectionFactory and target EPR
+     *
+     * @param jmsConnectionFactory the JMSConnectionFactory
+     * @param targetAddress target EPR
+     */
+    public JMSMessageSender(JMSConnectionFactory jmsConnectionFactory, String targetAddress) {
+
+        if (jmsConnectionFactory != null) {
+            this.cacheLevel  = jmsConnectionFactory.getCacheLevel();
+            this.jmsSpec11   = jmsConnectionFactory.isJmsSpec11();
+            this.connection  = jmsConnectionFactory.getConnection();
+            this.session     = jmsConnectionFactory.getSession(connection);
+            this.destination =
+                jmsConnectionFactory.getSharedDestination() == null ?
+                    jmsConnectionFactory.getDestination(JMSUtils.getDestination(targetAddress)) :
+                    jmsConnectionFactory.getSharedDestination();
+            this.producer = jmsConnectionFactory.getMessageProducer(connection, session, destination);
+
+        } else {
+            JMSOutTransportInfo jmsOut = new JMSOutTransportInfo(targetAddress);
+            jmsOut.loadConnectionFactoryFromProperies();
+        }
+    }
+
+    /**
+     * Perform actual send of JMS message to the Destination selected
+     *
+     * @param message the JMS message
+     * @param msgCtx the Axis2 MessageContext
+     */
+    public void send(Message message, MessageContext msgCtx) {
+
+        Boolean jtaCommit    = getBooleanProperty(msgCtx, BaseConstants.JTA_COMMIT_AFTER_SEND);
+        Boolean rollbackOnly = getBooleanProperty(msgCtx, BaseConstants.SET_ROLLBACK_ONLY);
+        Boolean persistent   = getBooleanProperty(msgCtx, JMSConstants.JMS_DELIVERY_MODE);
+        Integer priority     = getIntegerProperty(msgCtx, JMSConstants.JMS_PRIORITY);
+        Integer timeToLive   = getIntegerProperty(msgCtx, JMSConstants.JMS_TIME_TO_LIVE);
+
+        // Do not commit, if message is marked for rollback
+        if (rollbackOnly != null && rollbackOnly) {
+            jtaCommit = Boolean.FALSE;
+        }
+
+        if (persistent != null) {
+            try {
+                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+            } catch (JMSException e) {
+                handleException("Error setting JMS Producer for PERSISTENT delivery", e);
+            }
+        }
+        if (priority != null) {
+            try {
+                producer.setPriority(priority);
+            } catch (JMSException e) {
+                handleException("Error setting JMS Producer priority to : " + priority, e);
+            }
+        }
+        if (timeToLive != null) {
+            try {
+                producer.setTimeToLive(timeToLive);
+            } catch (JMSException e) {
+                handleException("Error setting JMS Producer TTL to : " + timeToLive, e);
+            }
+        }
+
+        boolean sendingSuccessful = false;
+        // perform actual message sending
+        try {
+            if (jmsSpec11 || isQueue == null) {
+                producer.send(message);
+
+            } else {
+                if (isQueue) {
+                    ((QueueSender) producer).send(message);
+
+                } else {
+                    ((TopicPublisher) producer).publish(message);
+                }
+            }
+
+            // set the actual MessageID to the message context for use by any others down the line
+            String msgId = null;
+            try {
+                msgId = message.getJMSMessageID();
+                if (msgId != null) {
+                    msgCtx.setProperty(JMSConstants.JMS_MESSAGE_ID, msgId);
+                }
+            } catch (JMSException ignore) {}
+
+            sendingSuccessful = true;
+           
+            if (log.isDebugEnabled()) {
+                log.debug("Sent Message Context ID : " + msgCtx.getMessageID() +
+                    " with JMS Message ID : " + msgId +
+                    " to destination : " + producer.getDestination());
+            }
+
+        } catch (JMSException e) {
+            log.error("Error sending message with MessageContext ID : " +
+                msgCtx.getMessageID() + " to destination : " + destination, e);
+
+        } finally {
+
+            if (jtaCommit != null) {
+
+                UserTransaction ut = (UserTransaction) msgCtx.getProperty(BaseConstants.USER_TRANSACTION);
+                if (ut != null) {
+
+                    try {
+                        if (sendingSuccessful && jtaCommit) {
+                            ut.commit();
+                        } else {
+                            ut.rollback();
+                        }
+                        msgCtx.removeProperty(BaseConstants.USER_TRANSACTION);
+
+                        if (log.isDebugEnabled()) {
+                            log.debug((sendingSuccessful ? "Committed" : "Rolled back") +
+                                " JTA Transaction");
+                        }
+
+                    } catch (Exception e) {
+                        handleException("Error committing/rolling back JTA transaction after " +
+                            "sending of message with MessageContext ID : " + msgCtx.getMessageID() +
+                            " to destination : " + destination, e);
+                    }
+                }
+
+            } else {
+                try {
+                    if (session.getTransacted()) {
+                        if (sendingSuccessful && (rollbackOnly == null || !rollbackOnly)) {
+                            session.commit();
+                        } else {
+                            session.rollback();
+                        }
+                    }
+
+                    if (log.isDebugEnabled()) {
+                        log.debug((sendingSuccessful ? "Committed" : "Rolled back") +
+                            " local (JMS Session) Transaction");
+                    }
+
+                } catch (JMSException e) {
+                    handleException("Error committing/rolling back local (i.e. session) " +
+                        "transaction after sending of message with MessageContext ID : " + 
+                        msgCtx.getMessageID() + " to destination : " + destination, e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Close non-shared producer, session and connection if any
+     */
+    public void close() {
+        if (cacheLevel < JMSConstants.CACHE_PRODUCER) {
+            try {
+                producer.close();
+            } catch (JMSException e) {
+                log.error("Error closing JMS MessageProducer after send", e);
+            }
+        }
+
+        if (cacheLevel < JMSConstants.CACHE_SESSION) {
+            try {
+                session.close();
+            } catch (JMSException e) {
+                log.error("Error closing JMS Session after send", e);
+            }
+        }
+
+        if (cacheLevel < JMSConstants.CACHE_CONNECTION) {
+            try {
+                connection.close();
+            } catch (JMSException e) {
+                log.error("Error closing JMS Connection after send", e);
+            }
+        }
+    }
+
+    private void handleException(String message, Exception e) {
+        log.error(message, e);
+        throw new AxisJMSException(message, e);
+    }
+
+    private Boolean getBooleanProperty(MessageContext msgCtx, String name) {
+        Object o = msgCtx.getProperty(name);
+        if (o != null) {
+            if (o instanceof Boolean) {
+                return (Boolean) o;
+            } else if (o instanceof String) {
+                return Boolean.valueOf((String) o);
+            }
+        }
+        return null;
+    }
+
+    private Integer getIntegerProperty(MessageContext msgCtx, String name) {
+        Object o = msgCtx.getProperty(name);
+        if (o != null) {
+            if (o instanceof Integer) {
+                return (Integer) o;
+            } else if (o instanceof String) {
+                return Integer.parseInt((String) o);
+            }
+        }
+        return null;
+    }
+
+    public void setConnection(Connection connection) {
+        this.connection = connection;
+    }
+
+    public void setSession(Session session) {
+        this.session = session;
+    }
+
+    public void setProducer(MessageProducer producer) {
+        this.producer = producer;
+    }
+
+    public void setCacheLevel(int cacheLevel) {
+        this.cacheLevel = cacheLevel;
+    }
+
+    public int getCacheLevel() {
+        return cacheLevel;
+    }
+
+    public Connection getConnection() {
+        return connection;
+    }
+
+    public MessageProducer getProducer() {
+        return producer;
+    }
+
+    public Session getSession() {
+        return session;
+    }
+}

Modified: webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java?rev=717873&r1=717872&r2=717873&view=diff
==============================================================================
--- webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java (original)
+++ webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java Sat Nov 15 08:11:11 2008
@@ -41,8 +41,8 @@
     /** The naming context */
     private Context context;
     /**
-     * this is a reference to the underlying JMS connection factory when sending messages
-     * through connection factories not defined to the transport sender
+     * this is a reference to the underlying JMS ConnectionFactory when sending messages
+     * through connection factories not defined at the TransportSender level
      */
     private ConnectionFactory connectionFactory = null;
     /**
@@ -53,14 +53,18 @@
     private JMSConnectionFactory jmsConnectionFactory = null;
     /** the Destination queue or topic for the outgoing message */
     private Destination destination = null;
-    /** the Destination queue or topic for the outgoing message i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC */
-    private String destinationType = JMSConstants.DESTINATION_TYPE_QUEUE;
+    /** the Destination queue or topic for the outgoing message
+     * i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC or DESTINATION_TYPE_GENERIC
+     */
+    private String destinationType = JMSConstants.DESTINATION_TYPE_GENERIC;
     /** the Reply Destination queue or topic for the outgoing message */
     private Destination replyDestination = null;
     /** the Reply Destination name */
     private String replyDestinationName = null;
-    /** the Reply Destination queue or topic for the outgoing message i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC */
-    private String replyDestinationType = JMSConstants.DESTINATION_TYPE_QUEUE;
+    /** the Reply Destination queue or topic for the outgoing message
+     * i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC or DESTINATION_TYPE_GENERIC
+     */
+    private String replyDestinationType = JMSConstants.DESTINATION_TYPE_GENERIC;
     /** the EPR properties when the out-transport info is generated from a target EPR */
     private Hashtable<String,String> properties = null;
     /** the target EPR string where applicable */
@@ -68,13 +72,12 @@
     /** the message property name that stores the content type of the outgoing message */
     private String contentTypeProperty;
     
-    private String contentType = null;
-
     /**
      * Creates an instance using the given JMS connection factory and destination
      *
      * @param jmsConnectionFactory the JMS connection factory
      * @param dest the destination
+     * @param contentTypeProperty
      */
     JMSOutTransportInfo(JMSConnectionFactory jmsConnectionFactory, Destination dest,
             String contentTypeProperty) {
@@ -91,26 +94,31 @@
      * @param targetEPR the target EPR
      */
     JMSOutTransportInfo(String targetEPR) {
+
         this.targetEPR = targetEPR;
         if (!targetEPR.startsWith(JMSConstants.JMS_PREFIX)) {
             handleException("Invalid prefix for a JMS EPR : " + targetEPR);
+
         } else {
             properties = BaseUtils.getEPRProperties(targetEPR);
-            String destinationType = properties.get(JMSConstants.DEST_PARAM_TYPE);
-            if(destinationType != null) {
+            String destinationType = properties.get(JMSConstants.PARAM_DEST_TYPE);
+            if (destinationType != null) {
                 setDestinationType(destinationType);
             }
-            String replyDestinationType = properties.get(JMSConstants.REPLY_PARAM_TYPE);
-            if(replyDestinationType != null) {
+
+            String replyDestinationType = properties.get(JMSConstants.PARAM_REPLY_DEST_TYPE);
+            if (replyDestinationType != null) {
                 setReplyDestinationType(replyDestinationType);
             }
-            replyDestinationName = properties.get(JMSConstants.REPLY_PARAM);
+
+            replyDestinationName = properties.get(JMSConstants.PARAM_REPLY_DESTINATION);
             contentTypeProperty = properties.get(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM);
             try {
                 context = new InitialContext(properties);
             } catch (NamingException e) {
                 handleException("Could not get an initial context using " + properties, e);
             }
+
             destination = getDestination(context, targetEPR);
             replyDestination = getReplyDestination(context, targetEPR);
         }
@@ -136,7 +144,7 @@
     private ConnectionFactory getConnectionFactory(Context context, Hashtable<String,String> props) {
         try {
 
-            String conFacJndiName = props.get(JMSConstants.CONFAC_JNDI_NAME_PARAM);
+            String conFacJndiName = props.get(JMSConstants.PARAM_CONFAC_JNDI_NAME);
             if (conFacJndiName != null) {
                 return JMSUtils.lookup(context, ConnectionFactory.class, conFacJndiName);
             } else {
@@ -177,10 +185,11 @@
      * @return the JMS destination, or null if it does not exist
      */
     private Destination getReplyDestination(Context context, String url) {
-        String replyDestinationName = properties.get(JMSConstants.REPLY_PARAM);
+        String replyDestinationName = properties.get(JMSConstants.PARAM_REPLY_DESTINATION);
         if(replyDestinationName == null) {
             return null;
         }
+
         try {
             return JMSUtils.lookup(context, Destination.class, replyDestinationName);
         } catch (NameNotFoundException e) {
@@ -190,13 +199,14 @@
         } catch (NamingException e) {
             handleException("Cannot locate destination : " + replyDestinationName + " using " + url, e);
         }
+
         return null;
     }
 
     /**
      * Look up for the given destination
-     * @param replyDest
-     * @return
+     * @param replyDest the JNDI name to lookup Destination required
+     * @return Destination for the JNDI name passed
      */
     public Destination getReplyDestination(String replyDest) {
         try {
@@ -236,7 +246,7 @@
     }
 
     public void setContentType(String contentType) {
-        this.contentType = contentType;
+        // this is a useless Axis2 method imposed by the OutTransportInfo interface :(
     }
 
     public Hashtable<String,String> getProperties() {

Modified: webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java?rev=717873&r1=717872&r2=717873&view=diff
==============================================================================
--- webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java (original)
+++ webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java Sat Nov 15 08:11:11 2008
@@ -20,6 +20,7 @@
 import org.apache.axiom.om.OMText;
 import org.apache.axiom.om.OMNode;
 import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.description.TransportOutDescription;
@@ -29,11 +30,9 @@
 import org.apache.axis2.transport.base.*;
 import org.apache.axis2.transport.base.streams.WriterOutputStream;
 import org.apache.axis2.transport.http.HTTPConstants;
-import org.apache.commons.logging.LogFactory;
 
 import javax.jms.*;
 import javax.activation.DataHandler;
-import javax.naming.Context;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.StringWriter;
@@ -45,34 +44,23 @@
  */
 public class JMSSender extends AbstractTransportSender implements ManagementSupport {
 
-    public static final String TRANSPORT_NAME = "jms";
-    
-    private JMSConnectionFactoryManager connFacManager;
+    public static final String TRANSPORT_NAME = Constants.TRANSPORT_JMS;
 
-    public JMSSender() {
-        log = LogFactory.getLog(JMSSender.class);
-    }
+    /** The JMS connection factory manager to be used when sending messages out */
+    private JMSConnectionFactoryManager connFacManager;
 
     /**
      * Initialize the transport sender by reading pre-defined connection factories for
-     * outgoing messages. These will create sessions (one per each destination dealth with)
-     * to be used when messages are being sent.
+     * outgoing messages.
+     *
      * @param cfgCtx the configuration context
      * @param transportOut the transport sender definition from axis2.xml
      * @throws AxisFault on error
      */
     public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut) throws AxisFault {
         super.init(cfgCtx, transportOut);
-        connFacManager = new JMSConnectionFactoryManager(cfgCtx);
-        // read the connection factory definitions and create them
-        connFacManager.loadConnectionFactoryDefinitions(transportOut);
-        connFacManager.start();
-    }
-
-    @Override
-    public void stop() {
-        connFacManager.stop();
-        super.stop();
+        connFacManager = new JMSConnectionFactoryManager(transportOut);
+        log.info("JMS Transport Sender initialized...");
     }
 
     /**
@@ -85,7 +73,7 @@
     private JMSConnectionFactory getJMSConnectionFactory(JMSOutTransportInfo trpInfo) {
         Map<String,String> props = trpInfo.getProperties();
         if(trpInfo.getProperties() != null) {
-            String jmsConnectionFactoryName = props.get(JMSConstants.CONFAC_PARAM);
+            String jmsConnectionFactoryName = props.get(JMSConstants.PARAM_JMS_CONFAC);
             if(jmsConnectionFactoryName != null) {
                 return connFacManager.getJMSConnectionFactory(jmsConnectionFactoryName);
             } else {
@@ -103,58 +91,87 @@
         OutTransportInfo outTransportInfo) throws AxisFault {
 
         JMSConnectionFactory jmsConnectionFactory = null;
-        Connection connection = null;   // holds a one time connection if used
-        JMSOutTransportInfo jmsOut;
-        Session session = null;
-        Destination replyDestination = null;
+        JMSOutTransportInfo jmsOut = null;
+        JMSMessageSender messageSender = null;
 
-        try {
-            if (targetAddress != null) {
+        if (targetAddress != null) {
 
-                jmsOut = new JMSOutTransportInfo(targetAddress);
-                // do we have a definition for a connection factory to use for this address?
-                jmsConnectionFactory = getJMSConnectionFactory(jmsOut);
+            jmsOut = new JMSOutTransportInfo(targetAddress);
+            // do we have a definition for a connection factory to use for this address?
+            jmsConnectionFactory = getJMSConnectionFactory(jmsOut);
+            
+            if (jmsConnectionFactory != null) {
+                messageSender = new JMSMessageSender(jmsConnectionFactory, targetAddress);
 
-                if (jmsConnectionFactory != null) {
-                    // create new or get existing session to send to the destination from the CF
-                    session = jmsConnectionFactory.getSessionForDestination(
-                        JMSUtils.getDestination(targetAddress));
+            } else {
+                try {
+                    messageSender = JMSUtils.createJMSSender(jmsOut);
+                } catch (JMSException e) {
+                    handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e);
+                }
+            }
 
-                } else {
-                    // digest the targetAddress and locate CF from the EPR
-                    jmsOut.loadConnectionFactoryFromProperies();
-                    try {
-                        // create a one time connection and session to be used
-                        Hashtable<String,String> jndiProps = jmsOut.getProperties();
-                        connection = JMSUtils.createConnection(jmsOut.getConnectionFactory(),
-                                jndiProps.get(Context.SECURITY_PRINCIPAL),
-                                jndiProps.get(Context.SECURITY_CREDENTIALS),
-                                jmsOut.getDestinationType());
+        } else if (outTransportInfo != null && outTransportInfo instanceof JMSOutTransportInfo) {
+
+            jmsOut = (JMSOutTransportInfo) outTransportInfo;
+            try {
+                messageSender = JMSUtils.createJMSSender(jmsOut);
+            } catch (JMSException e) {
+                handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e);
+            }
+        }
 
-                        session = JMSUtils.createSession(connection, false,
-                                Session.AUTO_ACKNOWLEDGE, jmsOut.getDestinationType());
+        // The message property to be used to send the content type is determined by
+        // the out transport info, i.e. either from the EPR if we are sending a request,
+        // or, if we are sending a response, from the configuration of the service that
+        // received the request). The property name can be overridden by a message
+        // context property.
+        String contentTypeProperty =
+            (String) msgCtx.getProperty(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM);
+        if (contentTypeProperty == null) {
+            contentTypeProperty = jmsOut.getContentTypeProperty();
+        }
 
-                    } catch (JMSException e) {
-                        handleException("Error creating a connection/session for : " + targetAddress, e);
-                    }
-                }
-                replyDestination = jmsOut.getReplyDestination();
+        if (messageSender.getCacheLevel() < JMSConstants.CACHE_SESSION) {
+            // only connection has been cached at most
+            sendOverJMS(msgCtx, messageSender, contentTypeProperty, jmsConnectionFactory, jmsOut);
+            
+        } else {
+            // need to synchronize as Sessions are not thread safe
+            synchronized (messageSender.getSession()) {
+                sendOverJMS(msgCtx, messageSender, contentTypeProperty, jmsConnectionFactory, jmsOut);
+            }
+        }
+    }
 
-            } else if (outTransportInfo != null && outTransportInfo instanceof JMSOutTransportInfo) {
+    /**
+     * Perform actual sending of the JMS message
+     */
+    private void sendOverJMS(MessageContext msgCtx, JMSMessageSender messageSender,
+        String contentTypeProperty, JMSConnectionFactory jmsConnectionFactory,
+        JMSOutTransportInfo jmsOut) throws AxisFault {
+        
+        // convert the axis message context into a JMS Message that we can send over JMS
+        Message message = null;
+        String correlationId = null;
+        try {
+            message = createJMSMessage(msgCtx, messageSender.getSession(), contentTypeProperty);
+        } catch (JMSException e) {
+            handleException("Error creating a JMS message from the message context", e);
+        }
 
-                jmsOut = (JMSOutTransportInfo) outTransportInfo;
-                jmsConnectionFactory = jmsOut.getJmsConnectionFactory();
+        // should we wait for a synchronous response on this same thread?
+        boolean waitForResponse = waitForSynchronousResponse(msgCtx);
+        Destination replyDestination = jmsOut.getReplyDestination();
 
-                session = jmsConnectionFactory.getSessionForDestination(
-                    jmsOut.getDestination().toString());
-            } else {
-                handleException("Unable to get JMSOutTransportInfo");
-                return; // We never get here. Just make the compiler happy.
-            }
-            
-            Destination destination = jmsOut.getDestination();
+        // if this is a synchronous out-in, prepare to listen on the response destination
+        if (waitForResponse) {
 
             String replyDestName = (String) msgCtx.getProperty(JMSConstants.JMS_REPLY_TO);
+            if (replyDestName == null && jmsConnectionFactory != null) {
+                replyDestName = jmsConnectionFactory.getReplyToDestination();
+            }
+
             if (replyDestName != null) {
                 if (jmsConnectionFactory != null) {
                     replyDestination = jmsConnectionFactory.getDestination(replyDestName);
@@ -162,107 +179,45 @@
                     replyDestination = jmsOut.getReplyDestination(replyDestName);
                 }
             }
+            replyDestination = JMSUtils.setReplyDestination(
+                replyDestination, messageSender.getSession(), message);
+        }
 
-            if(session == null) {
-               handleException("Could not create JMS session");
-            }
-
-            // now we are going to use the JMS session, but if this was a session from a
-            // defined JMS connection factory, we need to synchronize as sessions are not
-            // thread safe
-            synchronized(session) {
-                // The message property to be used to send the content type is determined by
-                // the out transport info, i.e. either from the EPR if we are sending a request,
-                // or, if we are sending a response, from the configuration of the service that
-                // received the request). The property name can be overridden by a message
-                // context property.
-                String contentTypeProperty =
-                        (String)msgCtx.getProperty(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM);
-                if (contentTypeProperty == null) {
-                    contentTypeProperty = jmsOut.getContentTypeProperty();
-                }
-
-                // convert the axis message context into a JMS Message that we can send over JMS
-                Message message = null;
-                String correlationId = null;
-                try {
-                    message = createJMSMessage(msgCtx, session, contentTypeProperty);
-                } catch (JMSException e) {
-                    handleException("Error creating a JMS message from the axis message context", e);
-                }
-
-                String destinationType = jmsOut.getDestinationType();
-
-                // if the destination does not exist, see if we can create it
-                destination = JMSUtils.createDestinationIfRequired(
-                    destination, destinationType, targetAddress, session);
-
-                if(jmsOut.getReplyDestinationName() != null) {
-                    replyDestination = JMSUtils.createReplyDestinationIfRequired(
-                        replyDestination, jmsOut.getReplyDestinationName(),
-                        jmsOut.getReplyDestinationType(), targetAddress, session);
-                }
-
-                // should we wait for a synchronous response on this same thread?
-                boolean waitForResponse = waitForSynchronousResponse(msgCtx);
-
-                // if this is a synchronous out-in, prepare to listen on the response destination
-                if (waitForResponse) {
-                    replyDestination = JMSUtils.setReplyDestination(
-                        replyDestination, session, message);
-                }
+        try {
+            messageSender.send(message, msgCtx);
+            metrics.incrementMessagesSent();
 
-                // send the outgoing message over JMS to the destination selected
-                try {
-                    JMSUtils.sendMessageToJMSDestination(session, destination, destinationType, message);
+        } catch (AxisJMSException e) {
+            metrics.incrementFaultsSending();
+            handleException("Error sending JMS message", e);
+        }
 
-                    // set the actual MessageID to the message context for use by any others
-                    try {
-                        String msgId = message.getJMSMessageID();
-                        if (msgId != null) {
-                            msgCtx.setProperty(JMSConstants.JMS_MESSAGE_ID, msgId);
-                        }
-                    } catch (JMSException ignore) {}
+        try {
+            metrics.incrementBytesSent(JMSUtils.getMessageSize(message));
+        } catch (JMSException e) {
+            log.warn("Error reading JMS message size to update transport metrics", e);
+        }
 
-                    metrics.incrementMessagesSent();
-                    try {
-                        metrics.incrementBytesSent(JMSUtils.getMessageSize(message));
-                    } catch (JMSException e) {
-                        log.warn("Error reading JMS message size to update transport metrics", e);
-                    }
-                } catch (BaseTransportException e) {
-                    metrics.incrementFaultsSending();
-                    throw e;
-                }
+        // if we are expecting a synchronous response back for the message sent out
+        if (waitForResponse) {
+            // TODO ********************************************************************************
+            // TODO **** replace with asynchronous polling via a poller task to process this *******
+            // information would be given. Then it should poll (until timeout) the
+            // requested destination for the response message and inject it from a
+            // asynchronous worker thread
+            try {
+                messageSender.getConnection().start();  // multiple calls are safely ignored
+            } catch (JMSException ignore) {}
 
-                // if we are expecting a synchronous response back for the message sent out
-                if (waitForResponse) {
-                    if (connection != null) {
-                        try {
-                            connection.start();
-                        } catch (JMSException ignore) {}
-                    } else {
-                        // If connection is null, we are using a cached session and the underlying
-                        // connection is already started. Thus, there is nothing to do here.
-                    }
-                    try {
-                        correlationId = message.getJMSMessageID();
-                    } catch(JMSException ignore) {}
-                    
-                    // We assume here that the response uses the same message property to
-                    // specify the content type of the message.
-                    waitForResponseAndProcess(session, replyDestination,
-                            jmsOut.getReplyDestinationType(), msgCtx, correlationId,
-                            contentTypeProperty);
-                }
-            }
+            try {
+                correlationId = message.getJMSMessageID();
+            } catch(JMSException ignore) {}
 
-        } finally {
-            if (connection != null) {
-                try {
-                    connection.close();
-                } catch (JMSException ignore) {}
-            }
+            // We assume here that the response uses the same message property to
+            // specify the content type of the message.
+            waitForResponseAndProcess(messageSender.getSession(), replyDestination,
+                msgCtx, correlationId, contentTypeProperty);
+            // TODO ********************************************************************************
         }
     }
 
@@ -278,17 +233,13 @@
      * @throws AxisFault on error
      */
     private void waitForResponseAndProcess(Session session, Destination replyDestination,
-            String replyDestinationType, MessageContext msgCtx, String correlationId,
+            MessageContext msgCtx, String correlationId,
             String contentTypeProperty) throws AxisFault {
 
         try {
             MessageConsumer consumer;
-            if (correlationId != null) {
-                consumer = JMSUtils.createConsumer(session, replyDestination,
-                        "JMSCorrelationID = '" + correlationId + "'");
-            } else {
-                consumer = JMSUtils.createConsumer(session, replyDestination);
-            }
+            consumer = JMSUtils.createConsumer(session, replyDestination,
+                "JMSCorrelationID = '" + correlationId + "'");
 
             // how long are we willing to wait for the sync response
             long timeout = JMSConstants.DEFAULT_JMS_TIMEOUT;
@@ -332,8 +283,9 @@
 
         } catch (JMSException e) {
             metrics.incrementFaultsReceiving();
-            handleException("Error creating consumer or receiving reply to : " +
-                replyDestination, e);
+            handleException("Error creating a consumer, or receiving a synchronous reply " +
+                "for outgoing MessageContext ID : " + msgCtx.getMessageID() +
+                " and reply Destination : " + replyDestination, e);
         }
     }