You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commons-dev@ws.apache.org by as...@apache.org on 2008/12/08 19:15:41 UTC

svn commit: r724432 [3/4] - in /webservices/commons/trunk/modules/transport/modules: base/src/main/java/org/apache/axis2/transport/base/ jms/ jms/src/main/java/org/apache/axis2/transport/jms/ testkit/src/main/java/org/apache/axis2/transport/testkit/tes...

Modified: webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java (original)
+++ webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java Mon Dec  8 10:15:40 2008
@@ -25,13 +25,14 @@
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.description.AxisService;
 import org.apache.axis2.description.Parameter;
-import org.apache.axis2.description.ParameterIncludeImpl;
 import org.apache.axis2.format.TextMessageBuilder;
 import org.apache.axis2.format.TextMessageBuilderAdapter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.axis2.transport.TransportUtils;
 import org.apache.axis2.transport.base.BaseUtils;
+import org.apache.axis2.transport.base.BaseConstants;
+import org.apache.axis2.transport.base.threads.WorkerPool;
 
 import javax.jms.*;
 import javax.jms.Queue;
@@ -55,58 +56,6 @@
     private static final Object[] NOPARMS = new Object[] {};
 
     /**
-     * Create a JMS Queue using the given connection with the JNDI destination name, and return the
-     * JMS Destination name of the created queue
-     *
-     * @param con the JMS Connection to be used
-     * @param destinationJNDIName the JNDI name of the Queue to be created
-     * @return the JMS Destination name of the created Queue
-     * @throws JMSException on error
-     */
-    public static String createJMSQueue(Connection con, String destinationJNDIName)
-            throws JMSException {
-        
-        try {
-            QueueSession session
-                    = ((QueueConnection) con).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
-            Queue queue = session.createQueue(destinationJNDIName);
-            log.info("JMS Queue with JNDI name : " + destinationJNDIName + " created");
-            return queue.getQueueName();
-
-        } finally {
-            try {
-                con.close();
-            } catch (JMSException ignore) {}
-        }
-    }
-
-    /**
-     * Create a JMS Topic using the given connection with the JNDI destination name, and return the
-     * JMS Destination name of the created queue
-     *
-     * @param con the JMS Connection to be used
-     * @param destinationJNDIName the JNDI name of the Topic to be created
-     * @return the JMS Destination name of the created Topic
-     * @throws JMSException on error
-     */
-    public static String createJMSTopic(Connection con, String destinationJNDIName)
-            throws JMSException {
-        
-        try {
-            TopicSession session
-                    = ((TopicConnection) con).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
-            Topic topic = session.createTopic(destinationJNDIName);
-            log.info("JMS Topic with JNDI name : " + destinationJNDIName + " created");
-            return topic.getTopicName();
-
-        } finally {
-            try {
-                con.close();
-            } catch (JMSException ignore) {}
-        }
-    }
-
-    /**
      * Should this service be enabled over the JMS transport?
      *
      * @param service the Axis service
@@ -131,26 +80,43 @@
      * Get the EPR for the given JMS connection factory and destination
      * the form of the URL is
      * jms:/<destination>?[<key>=<value>&]*
+     * Credentials Context.SECURITY_PRINCIPAL, Context.SECURITY_CREDENTIALS
+     * JMSConstants.PARAM_JMS_USERNAME and JMSConstants.PARAM_JMS_USERNAME are filtered
      *
-     * @param cf          the Axis2 JMS connection factory
-     * @param destination the JNDI name of the destination
+     * @param cf the Axis2 JMS connection factory
+     * @param destinationType the type of destination
+     * @param endpoint JMSEndpoint
      * @return the EPR as a String
      */
-    // TODO: duplicate code (see JMSConnectionFactory#getEPRForDestination)
-    static String getEPR(JMSConnectionFactory cf, JMSEndpoint endpoint) {
+    static String getEPR(JMSConnectionFactory cf, int destinationType, JMSEndpoint endpoint) {
         StringBuffer sb = new StringBuffer();
-        sb.append(JMSConstants.JMS_PREFIX).append(endpoint.getJndiDestinationName());
-        sb.append("?").append(JMSConstants.DEST_PARAM_TYPE);
-        sb.append("=").append(endpoint.getDestinationType());
-        for (Map.Entry<String,String> entry : cf.getJndiProperties().entrySet()) {
-            sb.append("&").append(entry.getKey()).append("=").append(entry.getValue());
-        }
-        String contentTypeProperty = endpoint.getContentTypeRuleSet().getDefaultContentTypeProperty();
-        if (contentTypeProperty != null) {
-            sb.append("&");
-            sb.append(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM);
-            sb.append("=");
-            sb.append(contentTypeProperty);
+
+        sb.append(
+            JMSConstants.JMS_PREFIX).append(endpoint.getJndiDestinationName());
+        sb.append("?").
+            append(JMSConstants.PARAM_DEST_TYPE).append("=").append(
+            destinationType == JMSConstants.TOPIC ?
+                JMSConstants.DESTINATION_TYPE_TOPIC : JMSConstants.DESTINATION_TYPE_QUEUE);
+
+        if (endpoint.getContentTypeRuleSet() != null) {
+            String contentTypeProperty =
+                endpoint.getContentTypeRuleSet().getDefaultContentTypeProperty();
+            if (contentTypeProperty != null) {
+                sb.append("&");
+                sb.append(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM);
+                sb.append("=");
+                sb.append(contentTypeProperty);
+            }
+        }
+
+        for (Map.Entry<String,String> entry : cf.getParameters().entrySet()) {
+            if (!Context.SECURITY_PRINCIPAL.equalsIgnoreCase(entry.getKey()) &&
+                !Context.SECURITY_CREDENTIALS.equalsIgnoreCase(entry.getKey()) &&
+                !JMSConstants.PARAM_JMS_USERNAME.equalsIgnoreCase(entry.getKey()) &&
+                !JMSConstants.PARAM_JMS_PASSWORD.equalsIgnoreCase(entry.getKey())) {
+                sb.append("&").append(
+                    entry.getKey()).append("=").append(entry.getValue());
+            }
         }
         return sb.toString();
     }
@@ -188,59 +154,16 @@
     }
 
     /**
-     * Set JNDI properties and any other connection factory parameters to the connection factory
-     * passed in, looking at the parameter in axis2.xml
-     * @param param the axis parameter that holds the connection factory settings
-     * @param jmsConFactory the JMS connection factory to which the parameters should be applied
+     * Set the SOAPEnvelope to the Axis2 MessageContext, from the JMS Message passed in
+     * @param message the JMS message read
+     * @param msgContext the Axis2 MessageContext to be populated
+     * @param contentType content type for the message
+     * @throws AxisFault
+     * @throws JMSException
      */
-    public static void setConnectionFactoryParameters(
-        Parameter param, JMSConnectionFactory jmsConFactory) {
-
-        ParameterIncludeImpl pi = new ParameterIncludeImpl();
-        try {
-            pi.deserializeParameters((OMElement) param.getValue());
-        } catch (AxisFault axisFault) {
-            log.error("Error reading parameters for JMS connection factory" +
-                jmsConFactory.getName(), axisFault);
-        }
-
-        for (Object o : pi.getParameters()) {
-
-            Parameter p = (Parameter) o;
-
-            if (JMSConstants.CONFAC_TYPE.equals(p.getName())) {
-                String connectionFactoryType = (String) p.getValue();
-                jmsConFactory.setConnectionFactoryType(connectionFactoryType);
-
-            } else if (JMSConstants.RECONNECT_TIMEOUT.equals(p.getName())) {
-                String strTimeout = (String) p.getValue();
-                int reconnectTimeoutSeconds = Integer.parseInt(strTimeout);
-                long reconnectTimeoutMillis = reconnectTimeoutSeconds * 1000;
-                jmsConFactory.setReconnectTimeout(reconnectTimeoutMillis);
-
-            } else if (Context.INITIAL_CONTEXT_FACTORY.equals(p.getName())) {
-                jmsConFactory.addJNDIContextProperty(
-                        Context.INITIAL_CONTEXT_FACTORY, (String) p.getValue());
-            } else if (Context.PROVIDER_URL.equals(p.getName())) {
-                jmsConFactory.addJNDIContextProperty(
-                        Context.PROVIDER_URL, (String) p.getValue());
-            } else if (Context.SECURITY_PRINCIPAL.equals(p.getName())) {
-                jmsConFactory.addJNDIContextProperty(
-                        Context.SECURITY_PRINCIPAL, (String) p.getValue());
-            } else if (Context.SECURITY_CREDENTIALS.equals(p.getName())) {
-                jmsConFactory.addJNDIContextProperty(
-                        Context.SECURITY_CREDENTIALS, (String) p.getValue());
-            } else if (JMSConstants.CONFAC_JNDI_NAME_PARAM.equals(p.getName())) {
-                jmsConFactory.setConnFactoryJNDIName((String) p.getValue());
-                jmsConFactory.addJNDIContextProperty(
-                        JMSConstants.CONFAC_JNDI_NAME_PARAM, (String) p.getValue());
-            } else {
-                jmsConFactory.addJNDIContextProperty( p.getName(), (String) p.getValue());
-            }
-        }
-    }
+    public static void setSOAPEnvelope(Message message, MessageContext msgContext, String contentType)
+        throws AxisFault, JMSException {
 
-    public static void setSOAPEnvelope(Message message, MessageContext msgContext, String contentType) throws AxisFault, JMSException {
         if (message instanceof BytesMessage) {
             if (contentType == null) {
                 log.debug("No content type specified; assuming application/octet-stream.");
@@ -268,6 +191,7 @@
                 return; // Make compiler happy
             }
             msgContext.setEnvelope(envelope);
+
         } else if (message instanceof TextMessage) {
             String type;
             if (contentType == null) {
@@ -288,6 +212,7 @@
                 }
                 builder = new SOAPBuilder();
             }
+
             TextMessageBuilder textMessageBuilder;
             if (builder instanceof TextMessageBuilder) {
                 textMessageBuilder = (TextMessageBuilder)builder;
@@ -312,6 +237,7 @@
      */
     public static Destination setReplyDestination(Destination replyDestination, Session session,
         Message message) {
+
         if (replyDestination == null) {
            try {
                // create temporary queue to receive the reply
@@ -340,116 +266,6 @@
     }
 
     /**
-     * When trying to send a message to a destination, if it does not exist, try to create it
-     *
-     * @param destination the JMS destination to send messages
-     * @param destinationType type of the destination (can be a queue or a topic)
-     * @param targetAddress the target JMS EPR to find the Destination to be created if required
-     * @param session the JMS session to use
-     * @return the JMS Destination where messages could be posted
-     * @throws AxisFault if the target Destination does not exist and cannot be created
-     */
-    public static Destination createDestinationIfRequired(Destination destination,
-        String destinationType, String targetAddress, Session session) throws AxisFault {
-        
-        if (destination == null) {
-            if (targetAddress != null) {
-                String name = JMSUtils.getDestination(targetAddress);
-                if (log.isDebugEnabled()) {
-                    log.debug("Creating JMS Destination : " + name);
-                }
-
-                try {
-                    destination = createDestination(session, name, destinationType);
-                } catch (JMSException e) {
-                    handleException("Error creating destination Queue : " + name, e);
-                }
-            } else {
-                handleException("Cannot send reply to null JMS Destination");
-            }
-        }
-        return destination;
-    }
-
-    /**
-     * If reply destination does not exist, try to create it
-     * 
-     * @param destination the destination queue or topic
-     * @param replyDestinationName name of the reply destination queue or topic
-     * @param destinationType type of the destination (can be queue or topic)
-     * @param targetAddress target address of the queue or topic
-     * @param session JMS session with the message to be sent
-     * @return destination created if the destination is null or the destination otherwise
-     * @throws org.apache.axis2.AxisFault in case of an error in creating the destination
-     */
-    public static Destination createReplyDestinationIfRequired(Destination destination,
-        String replyDestinationName, String destinationType, String targetAddress, Session session)
-            throws AxisFault {
-        
-        if (destination == null) {
-            if (targetAddress != null) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Creating JMS Reply Destination : " + replyDestinationName);
-                }
-
-                try {
-                    destination = createDestination(session, replyDestinationName, destinationType);
-                } catch (JMSException e) {
-                    handleException("Error creating reply destination : "
-                            + replyDestinationName, e);
-                }
-            } else {
-                handleException("Cannot send reply to null reply JMS Destination");
-            }
-        }
-        return destination;
-    }
-
-    /**
-     * Send the given message to the Destination using the given session
-     * 
-     * @param session the session to use to send
-     * @param destination the Destination
-     * @param destinationType type of the destination (can be a queue or a topic)
-     * @param message the JMS Message
-     * @throws AxisFault on error
-     */
-    public static void sendMessageToJMSDestination(Session session,
-        Destination destination, String destinationType, Message message) throws AxisFault {
-
-        MessageProducer producer = null;
-        try {
-            if (log.isDebugEnabled()) {
-                log.debug("Sending message to destination : " + destination);
-            }
-
-            if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType)) {
-                producer = ((TopicSession) session).createPublisher((Topic) destination);
-                ((TopicPublisher) producer).publish(message);
-            } else {                
-                producer = ((QueueSession) session).createSender((Queue) destination);
-                producer.send(message);
-            }
-
-            if (log.isDebugEnabled()) {
-                log.debug("Sent message to destination : " + destination +
-                    "\nMessage ID : " + message.getJMSMessageID() +
-                    "\nCorrelation ID : " + message.getJMSCorrelationID() +
-                    "\nReplyTo ID : " + message.getJMSReplyTo());
-            }
-
-        } catch (JMSException e) {
-            handleException("Error creating a producer or sending to : " + destination, e);
-        } finally {
-            if (producer != null) {
-                try {
-                    producer.close();
-                } catch (JMSException ignore) {}
-            }
-        }
-    }
-
-    /**
      * Set transport headers from the axis message context, into the JMS message
      *
      * @param msgContext the axis message context
@@ -485,19 +301,21 @@
                 } else {
                     log.warn("Invalid delivery mode ignored : " + o);
                 }
+
             } else if (JMSConstants.JMS_EXPIRATION.equals(name)) {
                 message.setJMSExpiration(
-                        Long.parseLong((String) headerMap.get(JMSConstants.JMS_EXPIRATION)));
+                    Long.parseLong((String) headerMap.get(JMSConstants.JMS_EXPIRATION)));
             } else if (JMSConstants.JMS_MESSAGE_ID.equals(name)) {
                 message.setJMSMessageID((String) headerMap.get(JMSConstants.JMS_MESSAGE_ID));
             } else if (JMSConstants.JMS_PRIORITY.equals(name)) {
                 message.setJMSPriority(
-                        Integer.parseInt((String) headerMap.get(JMSConstants.JMS_PRIORITY)));
+                    Integer.parseInt((String) headerMap.get(JMSConstants.JMS_PRIORITY)));
             } else if (JMSConstants.JMS_TIMESTAMP.equals(name)) {
                 message.setJMSTimestamp(
-                        Long.parseLong((String) headerMap.get(JMSConstants.JMS_TIMESTAMP)));
+                    Long.parseLong((String) headerMap.get(JMSConstants.JMS_TIMESTAMP)));
             } else if (JMSConstants.JMS_MESSAGE_TYPE.equals(name)) {
                 message.setJMSType((String) headerMap.get(JMSConstants.JMS_MESSAGE_TYPE));
+
             } else {
                 Object value = headerMap.get(name);
                 if (value instanceof String) {
@@ -644,80 +462,14 @@
     }
 
 
-    // ----------- JMS 1.0.2b compatibility methods -------------
-    public static Connection createConnection(ConnectionFactory conFactory, String user,
-        String pass, String destinationType) throws JMSException {
-
-        if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(destinationType) ) {
-            if (user != null && pass != null) {
-                return ((QueueConnectionFactory) conFactory).createQueueConnection(user, pass);
-            } else {
-                return ((QueueConnectionFactory) conFactory).createQueueConnection();
-            }
-            
-        } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ) {
-            if (user != null && pass != null) {
-                return ((TopicConnectionFactory) conFactory).createTopicConnection(user, pass);
-            } else {
-                return ((TopicConnectionFactory) conFactory).createTopicConnection();
-            }
-        } else {
-            handleException("Unable to determine type of JMS Connection Factory - i.e Queue/Topic");
-        }
-        return null;
-    }
-
-    public static Session createSession(Connection con,
-        boolean transacted, int acknowledgeMode, String destinationType) throws JMSException {
-
-        if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(destinationType) ) {
-            return ((QueueConnection) con).createQueueSession(transacted, acknowledgeMode);
-        } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ) {
-            return ((TopicConnection) con).createTopicSession(transacted, acknowledgeMode);
-        } else {
-            log.debug("JMS destination type not given or invalid, was '" + destinationType +
-                    "'. Taking the default value as queue");
-            return ((QueueConnection) con).createQueueSession(transacted, acknowledgeMode);
-        }
-    }
-
-    public static Destination createDestination(Session session, String destName,
-        String destinationType) throws JMSException {
-
-        if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(destinationType)) {
-            return session.createQueue(destName);
-        } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ) {
-            return session.createTopic(destName);
-        } else {
-            log.debug("JMS destination type not given or invalid, was '" + destinationType +
-                    "'. Taking the default value as queue");
-            return session.createQueue(destName);          
-        }
-    }
-
-    public static void createDestination(ConnectionFactory conFactory,
-        String destinationJNDIName, String destinationType) throws JMSException {
-
-        if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(destinationType)) {
-            JMSUtils.createJMSQueue(
-                ((QueueConnectionFactory) conFactory).createQueueConnection(),
-                destinationJNDIName);
-        } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ) {
-            JMSUtils.createJMSTopic(
-                ((TopicConnectionFactory) conFactory).createTopicConnection(),
-                destinationJNDIName);
-        }
-    }
-    public static MessageConsumer createConsumer(Session session, Destination dest)
-        throws JMSException {
-
-        if (dest instanceof Queue) {
-            return ((QueueSession) session).createReceiver((Queue) dest);
-        } else {
-            return ((TopicSession) session).createSubscriber((Topic) dest);
-        }
-    }
-
+    /**
+     * Create a MessageConsumer for the given Destination
+     * @param session JMS Session to use
+     * @param dest Destination for which the Consumer is to be created
+     * @param messageSelector the message selector to be used if any
+     * @return a MessageConsumer for the specified Destination
+     * @throws JMSException
+     */
     public static MessageConsumer createConsumer(Session session, Destination dest, String messageSelector)
         throws JMSException {
 
@@ -728,6 +480,13 @@
         }
     }
 
+    /**
+     * Create a temp queue or topic for synchronous receipt of responses, when a reply destination
+     * is not specified
+     * @param session the JMS Session to use
+     * @return a temporary Queue or Topic, depending on the session
+     * @throws JMSException
+     */
     public static Destination createTemporaryDestination(Session session) throws JMSException {
 
         if (session instanceof QueueSession) {
@@ -737,6 +496,11 @@
         }
     }
 
+    /**
+     * Return the body length in bytes for a bytes message
+     * @param bMsg the JMS BytesMessage
+     * @return length of body in bytes
+     */
     public static long getBodyLength(BytesMessage bMsg) {
         try {
             Method mtd = bMsg.getClass().getMethod("getBodyLength", NOARGS);
@@ -762,7 +526,13 @@
         } catch (JMSException ignore) {}
         return length;
     }
-    
+
+    /**
+     * Get the length of the message in bytes
+     * @param message
+     * @return message size (or approximation) in bytes
+     * @throws JMSException
+     */
     public static long getMessageSize(Message message) throws JMSException {
         if (message instanceof BytesMessage) {
             return JMSUtils.getBodyLength((BytesMessage) message);
@@ -799,4 +569,525 @@
             }
         }
     }
+
+    /**
+     * Create a ServiceTaskManager for the service passed in and its corresponding JMSConnectionFactory
+     * @param jcf
+     * @param service
+     * @param workerPool
+     * @return
+     */
+    public static ServiceTaskManager createTaskManagerForService(JMSConnectionFactory jcf,
+        AxisService service, WorkerPool workerPool) {
+
+        String name = service.getName();
+        Map<String, String> svc = getServiceStringParameters(service.getParameters());
+        Map<String, String> cf  = jcf.getParameters();
+
+        ServiceTaskManager stm = new ServiceTaskManager();
+
+        stm.setServiceName(name);
+        stm.addJmsProperties(cf);
+        stm.addJmsProperties(svc);
+
+        stm.setConnFactoryJNDIName(
+            getRqdStringProperty(JMSConstants.PARAM_CONFAC_JNDI_NAME, svc, cf));
+        String destName = getOptionalStringProperty(JMSConstants.PARAM_DESTINATION, svc, cf);
+        if (destName == null) {
+            destName = service.getName();
+        }
+        stm.setDestinationJNDIName(destName);
+        stm.setDestinationType(getDestinationType(svc, cf));
+
+        stm.setJmsSpec11(
+            getJMSSpecVersion(svc, cf));
+        stm.setTransactionality(
+            getTransactionality(svc, cf));
+        stm.setCacheUserTransaction(
+            getOptionalBooleanProperty(BaseConstants.PARAM_CACHE_USER_TXN, svc, cf));
+        stm.setUserTransactionJNDIName(
+            getOptionalStringProperty(BaseConstants.PARAM_USER_TXN_JNDI_NAME, svc, cf));
+        stm.setSessionTransacted(
+            getOptionalBooleanProperty(JMSConstants.PARAM_SESSION_TRANSACTED, svc, cf));
+        stm.setSessionAckMode(
+            getSessionAck(svc, cf));
+        stm.setMessageSelector(
+            getOptionalStringProperty(JMSConstants.PARAM_MSG_SELECTOR, svc, cf));
+        stm.setSubscriptionDurable(
+            getOptionalBooleanProperty(JMSConstants.PARAM_SUB_DURABLE, svc, cf));
+        stm.setDurableSubscriberName(
+            getOptionalStringProperty(JMSConstants.PARAM_DURABLE_SUB_NAME, svc, cf));
+
+        stm.setCacheLevel(
+            getCacheLevel(svc, cf));
+        stm.setPubSubNoLocal(
+            getOptionalBooleanProperty(JMSConstants.PARAM_PUBSUB_NO_LOCAL, svc, cf));
+
+        Integer value = getOptionalIntProperty(JMSConstants.PARAM_RCV_TIMEOUT, svc, cf);
+        if (value != null) {
+            stm.setReceiveTimeout(value);
+        }
+        value = getOptionalIntProperty(JMSConstants.PARAM_CONCURRENT_CONSUMERS, svc, cf);
+        if (value != null) {
+            stm.setConcurrentConsumers(value);
+        }
+        value = getOptionalIntProperty(JMSConstants.PARAM_MAX_CONSUMERS, svc, cf);
+        if (value != null) {
+            stm.setMaxConcurrentConsumers(value);
+        }
+        value = getOptionalIntProperty(JMSConstants.PARAM_IDLE_TASK_LIMIT, svc, cf);
+        if (value != null) {
+            stm.setIdleTaskExecutionLimit(value);
+        }
+        value = getOptionalIntProperty(JMSConstants.PARAM_MAX_MSGS_PER_TASK, svc, cf);
+        if (value != null) {
+            stm.setMaxMessagesPerTask(value);
+        }
+
+        value = getOptionalIntProperty(JMSConstants.PARAM_RECON_INIT_DURATION, svc, cf);
+        if (value != null) {
+            stm.setInitialReconnectDuration(value);
+        }
+        value = getOptionalIntProperty(JMSConstants.PARAM_RECON_MAX_DURATION, svc, cf);
+        if (value != null) {
+            stm.setMaxReconnectDuration(value);
+        }
+        Double dValue = getOptionalDoubleProperty(JMSConstants.PARAM_RECON_FACTOR, svc, cf);
+        if (dValue != null) {
+            stm.setReconnectionProgressionFactor(dValue);
+        }
+
+        stm.setWorkerPool(workerPool);
+
+        // remove processed properties from property bag
+        stm.removeJmsProperties(JMSConstants.PARAM_CONFAC_JNDI_NAME);
+        stm.removeJmsProperties(JMSConstants.PARAM_DESTINATION);
+        stm.removeJmsProperties(JMSConstants.PARAM_JMS_SPEC_VER);
+        stm.removeJmsProperties(BaseConstants.PARAM_TRANSACTIONALITY);
+        stm.removeJmsProperties(BaseConstants.PARAM_CACHE_USER_TXN);
+        stm.removeJmsProperties(BaseConstants.PARAM_USER_TXN_JNDI_NAME);
+        stm.removeJmsProperties(JMSConstants.PARAM_SESSION_TRANSACTED);
+        stm.removeJmsProperties(JMSConstants.PARAM_MSG_SELECTOR);
+        stm.removeJmsProperties(JMSConstants.PARAM_SUB_DURABLE);
+        stm.removeJmsProperties(JMSConstants.PARAM_DURABLE_SUB_NAME);
+        stm.removeJmsProperties(JMSConstants.PARAM_CACHE_LEVEL);
+        stm.removeJmsProperties(JMSConstants.PARAM_PUBSUB_NO_LOCAL);
+        stm.removeJmsProperties(JMSConstants.PARAM_RCV_TIMEOUT);
+        stm.removeJmsProperties(JMSConstants.PARAM_CONCURRENT_CONSUMERS);
+        stm.removeJmsProperties(JMSConstants.PARAM_MAX_CONSUMERS);
+        stm.removeJmsProperties(JMSConstants.PARAM_IDLE_TASK_LIMIT);
+        stm.removeJmsProperties(JMSConstants.PARAM_MAX_MSGS_PER_TASK);
+        stm.removeJmsProperties(JMSConstants.PARAM_RECON_INIT_DURATION);
+        stm.removeJmsProperties(JMSConstants.PARAM_RECON_MAX_DURATION);
+        stm.removeJmsProperties(JMSConstants.PARAM_RECON_FACTOR);
+
+        return stm;
+    }
+
+    private static Map<String, String> getServiceStringParameters(List list) {
+
+        Map<String, String> map = new HashMap<String, String>();
+        for (Object o : list) {
+            Parameter p = (Parameter) o;
+            if (p.getValue() instanceof String) {
+                map.put(p.getName(), (String) p.getValue());
+            }
+        }
+        return map;
+    }
+
+    private static String getRqdStringProperty(String key, Map svcMap, Map cfMap) {
+        String value = (String) svcMap.get(key);
+        if (value == null) {
+            value = (String) cfMap.get(key);
+        }
+        if (value == null) {
+            throw new AxisJMSException("Service/connection factory property : " + key);
+        }
+        return value;
+    }
+
+    private static String getOptionalStringProperty(String key, Map svcMap, Map cfMap) {
+        String value = (String) svcMap.get(key);
+        if (value == null) {
+            value = (String) cfMap.get(key);
+        }
+        return value;
+    }
+
+    private static Boolean getOptionalBooleanProperty(String key, Map svcMap, Map cfMap) {
+        String value = (String) svcMap.get(key);
+        if (value == null) {
+            value = (String) cfMap.get(key);
+        }
+        if (value == null) {
+            return null;
+        } else {
+            return Boolean.valueOf(value);
+        }
+    }
+
+    private static Integer getOptionalIntProperty(String key, Map svcMap, Map cfMap) {
+        String value = (String) svcMap.get(key);
+        if (value == null) {
+            value = (String) cfMap.get(key);
+        }
+        if (value != null) {
+            try {
+                return Integer.parseInt(value);
+            } catch (NumberFormatException e) {
+                throw new AxisJMSException("Invalid value : " + value + " for " + key);
+            }
+        }
+        return null;
+    }
+
+    private static Double getOptionalDoubleProperty(String key, Map svcMap, Map cfMap) {
+        String value = (String) svcMap.get(key);
+        if (value == null) {
+            value = (String) cfMap.get(key);
+        }
+        if (value != null) {
+            try {
+                return Double.parseDouble(value);
+            } catch (NumberFormatException e) {
+                throw new AxisJMSException("Invalid value : " + value + " for " + key);
+            }
+        }
+        return null;
+    }
+
+    private static int getTransactionality(Map svcMap, Map cfMap) {
+
+        String key = BaseConstants.PARAM_TRANSACTIONALITY;
+        String val = (String) svcMap.get(key);
+        if (val == null) {
+            val = (String) cfMap.get(key);
+        }
+
+        if (val == null) {
+            return BaseConstants.TRANSACTION_NONE;
+
+        } else {    
+            if (BaseConstants.STR_TRANSACTION_JTA.equalsIgnoreCase(val)) {
+                return BaseConstants.TRANSACTION_JTA;
+            } else if (BaseConstants.STR_TRANSACTION_LOCAL.equalsIgnoreCase(val)) {
+                return BaseConstants.TRANSACTION_LOCAL;
+            } else {
+                throw new AxisJMSException("Invalid option : " + val + " for parameter : " +
+                    BaseConstants.STR_TRANSACTION_JTA);
+            }
+        }
+    }
+
+    private static int getDestinationType(Map svcMap, Map cfMap) {
+
+        String key = JMSConstants.PARAM_DEST_TYPE;
+        String val = (String) svcMap.get(key);
+        if (val == null) {
+            val = (String) cfMap.get(key);
+        }
+
+        if (JMSConstants.DESTINATION_TYPE_TOPIC.equalsIgnoreCase(val)) {
+            return JMSConstants.TOPIC;
+        }
+        return JMSConstants.QUEUE;
+    }
+
+    private static int getSessionAck(Map svcMap, Map cfMap) {
+
+        String key = JMSConstants.PARAM_SESSION_ACK;
+        String val = (String) svcMap.get(key);
+        if (val == null) {
+            val = (String) cfMap.get(key);
+        }
+
+        if (val == null || "AUTO_ACKNOWLEDGE".equalsIgnoreCase(val)) {
+            return Session.AUTO_ACKNOWLEDGE;
+        } else if ("CLIENT_ACKNOWLEDGE".equalsIgnoreCase(val)) {
+            return Session.CLIENT_ACKNOWLEDGE;
+        } else if ("DUPS_OK_ACKNOWLEDGE".equals(val)){
+            return Session.DUPS_OK_ACKNOWLEDGE;
+        } else if ("SESSION_TRANSACTED".equals(val)) {
+            return 0; //Session.SESSION_TRANSACTED;
+        } else {
+            try {
+                return Integer.parseInt(val);
+            } catch (NumberFormatException ignore) {
+                throw new AxisJMSException("Invalid session acknowledgement mode : " + val);
+            }
+        }
+    }
+
+    private static int getCacheLevel(Map svcMap, Map cfMap) {
+
+        String key = JMSConstants.PARAM_CACHE_LEVEL;
+        String val = (String) svcMap.get(key);
+        if (val == null) {
+            val = (String) cfMap.get(key);
+        }
+
+        if ("none".equalsIgnoreCase(val)) {
+            return JMSConstants.CACHE_NONE;
+        } else if ("connection".equalsIgnoreCase(val)) {
+            return JMSConstants.CACHE_CONNECTION;
+        } else if ("session".equals(val)){
+            return JMSConstants.CACHE_SESSION;
+        } else if ("consumer".equals(val)) {
+            return JMSConstants.CACHE_CONSUMER;
+        } else if (val != null) {
+            throw new AxisJMSException("Invalid cache level : " + val);
+        }
+        return JMSConstants.CACHE_AUTO;
+    }
+
+    private static boolean getJMSSpecVersion(Map svcMap, Map cfMap) {
+
+        String key = JMSConstants.PARAM_JMS_SPEC_VER;
+        String val = (String) svcMap.get(key);
+        if (val == null) {
+            val = (String) cfMap.get(key);
+        }
+
+        if (val == null || "1.1".equals(val)) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    /**
+     * This is a JMS spec independent method to create a Connection. Please be cautious when
+     * making any changes
+     *
+     * @param conFac the ConnectionFactory to use
+     * @param user optional user name
+     * @param pass optional password
+     * @param jmsSpec11 should we use JMS 1.1 API ?
+     * @param isQueue is this to deal with a Queue?
+     * @return a JMS Connection as requested
+     * @throws JMSException on errors, to be handled and logged by the caller
+     */
+    public static Connection createConnection(ConnectionFactory conFac,
+        String user, String pass, boolean jmsSpec11, Boolean isQueue) throws JMSException {
+
+        Connection connection = null;
+        if (log.isDebugEnabled()) {
+            log.debug("Creating a " + (isQueue == null ? "Generic" : isQueue ? "Queue" : "Topic") +
+                "Connection using credentials : (" + user + "/" + pass + ")");
+        }
+
+        if (jmsSpec11 || isQueue == null) {
+            if (user != null && pass != null) {
+                connection = conFac.createConnection(user, pass);
+            } else {
+                connection = conFac.createConnection();
+            }
+
+        } else {
+            QueueConnectionFactory qConFac = null;
+            TopicConnectionFactory tConFac = null;
+            if (isQueue) {
+                tConFac = (TopicConnectionFactory) conFac;
+            } else {
+                qConFac = (QueueConnectionFactory) conFac;
+            }
+
+            if (user != null && pass != null) {
+                if (qConFac != null) {
+                    connection = qConFac.createQueueConnection(user, pass);
+                } else if (tConFac != null) {
+                    connection = tConFac.createTopicConnection(user, pass);
+                }
+            } else {
+                if (qConFac != null) {
+                    connection = qConFac.createQueueConnection();
+                } else if (tConFac != null) {
+                    connection = tConFac.createTopicConnection();
+                }
+            }
+        }
+        return connection;
+    }
+
+    /**
+     * This is a JMS spec independent method to create a Session. Please be cautious when
+     * making any changes
+     *
+     * @param connection the JMS Connection
+     * @param transacted should the session be transacted?
+     * @param ackMode the ACK mode for the session
+     * @param jmsSpec11 should we use the JMS 1.1 API?
+     * @param isQueue is this Session to deal with a Queue?
+     * @return a Session created for the given information
+     * @throws JMSException on errors, to be handled and logged by the caller
+     */
+    public static Session createSession(Connection connection, boolean transacted, int ackMode,
+        boolean jmsSpec11, Boolean isQueue) throws JMSException {
+
+        if (jmsSpec11 || isQueue == null) {
+            return connection.createSession(transacted, ackMode);
+
+        } else {
+            if (isQueue) {
+                return ((QueueConnection) connection).createQueueSession(transacted, ackMode);
+            } else {
+                return ((TopicConnection) connection).createTopicSession(transacted, ackMode);
+            }
+        }
+    }
+
+    /**
+     * This is a JMS spec independent method to create a MessageConsumer. Please be cautious when
+     * making any changes
+     *
+     * @param session JMS session
+     * @param destination the Destination
+     * @param isQueue is the Destination a queue?
+     * @param subscriberName optional client name to use for a durable subscription to a topic
+     * @param messageSelector optional message selector
+     * @param pubSubNoLocal should we receive messages sent by us during pub-sub?
+     * @param isDurable is this a durable topic subscription?
+     * @param jmsSpec11 should we use JMS 1.1 API ?
+     * @return a MessageConsumer to receive messages
+     * @throws JMSException on errors, to be handled and logged by the caller
+     */
+    public static MessageConsumer createConsumer(
+        Session session, Destination destination, Boolean isQueue,
+        String subscriberName, String messageSelector, boolean pubSubNoLocal,
+        boolean isDurable, boolean jmsSpec11) throws JMSException {
+
+        if (jmsSpec11 || isQueue == null) {
+            if (isDurable) {
+                return session.createDurableSubscriber(
+                    (Topic) destination, subscriberName, messageSelector, pubSubNoLocal);
+            } else {
+                return session.createConsumer(destination, messageSelector, pubSubNoLocal);
+            }
+        } else {
+            if (isQueue) {
+                return ((QueueSession) session).createReceiver((Queue) destination, messageSelector);
+            } else {
+                if (isDurable) {
+                    return ((TopicSession) session).createDurableSubscriber(
+                        (Topic) destination, subscriberName, messageSelector, pubSubNoLocal);
+                } else {
+                    return ((TopicSession) session).createSubscriber(
+                        (Topic) destination, messageSelector, pubSubNoLocal);
+                }
+            }
+        }
+    }
+
+    /**
+     * This is a JMS spec independent method to create a MessageProducer. Please be cautious when
+     * making any changes
+     *
+     * @param session JMS session
+     * @param destination the Destination
+     * @param isQueue is the Destination a queue?
+     * @param jmsSpec11 should we use JMS 1.1 API ?
+     * @return a MessageProducer to send messages to the given Destination
+     * @throws JMSException on errors, to be handled and logged by the caller
+     */
+    public static MessageProducer createProducer(
+        Session session, Destination destination, Boolean isQueue, boolean jmsSpec11) throws JMSException {
+
+        if (jmsSpec11 || isQueue == null) {
+            return session.createProducer(destination);
+        } else {
+            if (isQueue) {
+                return ((QueueSession) session).createSender((Queue) destination);
+            } else {
+                return ((TopicSession) session).createPublisher((Topic) destination);               
+            }
+        }
+    }
+
+    /**
+     * Create a one time MessageProducer for the given JMS OutTransport information
+     * For simplicity and best compatibility, this method uses only JMS 1.0.2b API.
+     * Please be cautious when making any changes
+     *
+     * @param jmsOut the JMS OutTransport information (contains all properties)
+     * @return a JMSSender based on one-time use resources
+     * @throws JMSException on errors, to be handled and logged by the caller 
+     */
+    public static JMSMessageSender createJMSSender(JMSOutTransportInfo jmsOut)
+        throws JMSException {
+
+        // digest the targetAddress and locate CF from the EPR
+        jmsOut.loadConnectionFactoryFromProperies();
+
+        // create a one time connection and session to be used
+        Hashtable<String,String> jmsProps = jmsOut.getProperties();
+        String user = jmsProps != null ? jmsProps.get(JMSConstants.PARAM_JMS_USERNAME) : null;
+        String pass = jmsProps != null ? jmsProps.get(JMSConstants.PARAM_JMS_PASSWORD) : null;
+
+        QueueConnectionFactory qConFac = null;
+        TopicConnectionFactory tConFac = null;
+
+        int destType = -1;
+        if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(jmsOut.getDestinationType())) {
+            destType = JMSConstants.QUEUE;
+            qConFac = (QueueConnectionFactory) jmsOut.getConnectionFactory();
+
+        } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(jmsOut.getDestinationType())) {
+            destType = JMSConstants.TOPIC;
+            tConFac = (TopicConnectionFactory) jmsOut.getConnectionFactory();
+        }
+
+        Connection connection = null;
+        if (user != null && pass != null) {
+            if (qConFac != null) {
+                connection = qConFac.createQueueConnection(user, pass);
+            } else if (tConFac != null) {
+                connection = tConFac.createTopicConnection(user, pass);
+            }
+        } else {
+           if (qConFac != null) {
+                connection = qConFac.createQueueConnection();
+            } else if (tConFac != null)  {
+                connection = tConFac.createTopicConnection();
+            }
+        }
+
+        if (connection == null && jmsOut.getJmsConnectionFactory() != null) {
+            connection = jmsOut.getJmsConnectionFactory().getConnection();
+        }
+
+        Session session = null;
+        MessageProducer producer = null;
+        Destination destination = jmsOut.getDestination();
+
+        if (destType == JMSConstants.QUEUE) {
+            session = ((QueueConnection) connection).
+                createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+            producer = ((QueueSession) session).createSender((Queue) destination);
+        } else {
+            session = ((TopicConnection) connection).
+                createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+            producer = ((TopicSession) session).createPublisher((Topic) destination);
+        }
+
+        return new JMSMessageSender(connection, session, producer,
+            destination, (jmsOut.getJmsConnectionFactory() == null ?
+            JMSConstants.CACHE_NONE : jmsOut.getJmsConnectionFactory().getCacheLevel()), false,
+            destType == -1 ? null : destType == JMSConstants.QUEUE ? Boolean.TRUE : Boolean.FALSE);
+    }
+
+    /**
+     * Return a String representation of the destination type
+     * @param destType the destination type indicator int
+     * @return a descriptive String
+     */
+    public static String getDestinationTypeAsString(int destType) {
+        if (destType == JMSConstants.QUEUE) {
+            return "Queue";
+        } else if (destType == JMSConstants.TOPIC) {
+            return "Topic";
+        } else {
+            return "Generic";
+        }
+    }
 }