You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commons-dev@ws.apache.org by as...@apache.org on 2008/12/08 19:15:41 UTC

svn commit: r724432 [1/4] - in /webservices/commons/trunk/modules/transport/modules: base/src/main/java/org/apache/axis2/transport/base/ jms/ jms/src/main/java/org/apache/axis2/transport/jms/ testkit/src/main/java/org/apache/axis2/transport/testkit/tes...

Author: asankha
Date: Mon Dec  8 10:15:40 2008
New Revision: 724432

URL: http://svn.apache.org/viewvc?rev=724432&view=rev
Log:
merging JMS transport enhancements from branch webservices/commons/trunk/scratch/asankha/
supports JTA and JMS local transactions
supports dynamic scaling and many advanced JMS options


Added:
    webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java
    webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
    webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/package.html
Modified:
    webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java
    webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportSender.java
    webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/BaseConstants.java
    webservices/commons/trunk/modules/transport/modules/jms/pom.xml
    webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java
    webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactoryManager.java
    webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConstants.java
    webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSEndpoint.java
    webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java
    webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageReceiver.java
    webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java
    webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
    webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java
    webservices/commons/trunk/modules/transport/modules/testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/MinConcurrencyTest.java
    webservices/commons/trunk/modules/transport/modules/tests/log4j.properties
    webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSChannel.java
    webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSRequestResponseChannel.java
    webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportDescriptionFactory.java
    webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportTest.java

Modified: webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java (original)
+++ webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java Mon Dec  8 10:15:40 2008
@@ -171,6 +171,14 @@
         }
         return result;
     }
+
+    public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault {
+        return getEPRsForService(serviceName);
+    }
+
+    protected EndpointReference[] getEPRsForService(String serviceName) {
+        return null;
+    }
     
     private boolean ignoreService(AxisService service) {
         return service.getName().startsWith("__"); // these are "private" services

Modified: webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportSender.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportSender.java?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportSender.java (original)
+++ webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportSender.java Mon Dec  8 10:15:40 2008
@@ -32,6 +32,7 @@
 import org.apache.axis2.description.TransportInDescription;
 import org.apache.axis2.description.WSDL2Constants;
 import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.axiom.om.util.UUIDGenerator;
 
 import javax.management.MBeanServer;
@@ -58,6 +59,13 @@
     private int state = BaseConstants.STOPPED;
 
     /**
+     * A constructor that makes subclasses pick up the correct logger
+     */
+    protected AbstractTransportSender() {
+        log = LogFactory.getLog(this.getClass());
+    }
+
+    /**
      * Initialize the generic transport sender.
      *
      * @param cfgCtx the axis configuration context

Modified: webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/BaseConstants.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/BaseConstants.java?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/BaseConstants.java (original)
+++ webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/BaseConstants.java Mon Dec  8 10:15:40 2008
@@ -96,4 +96,31 @@
     // this is an property required by axis2
     // FIXME: where is this required in Axis2?
     public final static String MAIL_CONTENT_TYPE = "mail.contenttype";
+
+    /** Service transaction level - non-transactional */
+    public static final int TRANSACTION_NONE  = 0;
+    /** Service transaction level - use non-JTA (i.e. local) transactions */
+    public static final int TRANSACTION_LOCAL = 1;
+    /** Service transaction level - use JTA transactions */
+    public static final int TRANSACTION_JTA   = 2;
+    /** Service transaction level - non-transactional */
+    public static final String STR_TRANSACTION_NONE  = "none";
+    /** Service transaction level - use non-JTA (i.e. local) transactions */
+    public static final String STR_TRANSACTION_LOCAL = "local";
+    /** Service transaction level - use JTA transactions */
+    public static final String STR_TRANSACTION_JTA   = "jta";
+
+    /** The Parameter name indicating the transactionality of a service */
+    public static final String PARAM_TRANSACTIONALITY = "transport.Transactionality";
+    /** Parameter name indicating the JNDI name to get a UserTransaction from JNDI */
+    public static final String PARAM_USER_TXN_JNDI_NAME = "transport.UserTxnJNDIName";
+    /** Parameter that indicates if a UserTransaction reference could be cached - default yes */
+    public static final String PARAM_CACHE_USER_TXN = "transport.CacheUserTxn";
+
+    /** The UserTransaction associated with this message */
+    public static final String USER_TRANSACTION = "UserTransaction";
+    /** A message level property indicating a request to rollback the transaction associated with the message */
+    public static final String SET_ROLLBACK_ONLY = "SET_ROLLBACK_ONLY";
+    /** A message level property indicating a commit is required after the next immidiate send over a transport */
+    public static final String JTA_COMMIT_AFTER_SEND = "JTA_COMMIT_AFTER_SEND";    
 }

Modified: webservices/commons/trunk/modules/transport/modules/jms/pom.xml
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/jms/pom.xml?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/jms/pom.xml (original)
+++ webservices/commons/trunk/modules/transport/modules/jms/pom.xml Mon Dec  8 10:15:40 2008
@@ -76,6 +76,12 @@
             <artifactId>geronimo-jms_1.1_spec</artifactId>
             <version>${jms-1.1-spec.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.geronimo.specs</groupId>
+            <artifactId>geronimo-jta_1.0.1B_spec</artifactId>
+            <version>${jta-spec.version}</version>
+        </dependency>
         
         <dependency>
             <groupId>junit</groupId>
@@ -95,6 +101,7 @@
         <commons.logging.version>1.1</commons.logging.version>
         <axis2-transport-base.version>SNAPSHOT</axis2-transport-base.version>
         <jms-1.1-spec.version>1.1</jms-1.1-spec.version>
+        <jta-spec.version>1.0</jta-spec.version>
     </properties>
 
 </project>
\ No newline at end of file

Modified: webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java (original)
+++ webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java Mon Dec  8 10:15:40 2008
@@ -21,6 +21,10 @@
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.transport.base.BaseUtils;
 import org.apache.axis2.transport.base.threads.WorkerPool;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.ParameterIncludeImpl;
+import org.apache.axis2.AxisFault;
+import org.apache.axiom.om.OMElement;
 
 import javax.jms.*;
 import javax.naming.Context;
@@ -33,503 +37,358 @@
 
 /**
  * Encapsulate a JMS Connection factory definition within an Axis2.xml
- * <p/>
- * More than one JMS connection factory could be defined within an Axis2 XML
- * specifying the JMSListener as the transportReceiver.
- * <p/>
- * These connection factories are created at the initialization of the
- * transportReceiver, and any service interested in using any of these could
- * specify the name of the factory and the destination through Parameters named
- * JMSConstants.CONFAC_PARAM and JMSConstants.DEST_PARAM as shown below.
- * <p/>
- * <parameter name="transport.jms.ConnectionFactory" locked="true">myQueueConnectionFactory</parameter>
- * <parameter name="transport.jms.Destination" locked="true">TestQueue</parameter>
- * <p/>
- * If a connection factory is defined by a parameter named
- * JMSConstants.DEFAULT_CONFAC_NAME in the Axis2 XML, services which does not
- * explicitly specify a connection factory will be defaulted to it - if it is
- * defined in the Axis2 configuration.
- * <p/>
- * e.g.
- * <transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
- * <parameter name="myTopicConnectionFactory" locked="false">
- * <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
- * <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
- * <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter>
- * <parameter name="transport.jms.Destination" locked="false">myTopicOne, myTopicTwo</parameter>
- * </parameter>
- * <parameter name="myQueueConnectionFactory" locked="false">
- * <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
- * <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
- * <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
- * <parameter name="transport.jms.Destination" locked="false">myQueueOne, myQueueTwo</parameter>
- * </parameter>
- * <parameter name="default" locked="false">
- * <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
- * <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
- * <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">ConnectionFactory</parameter>
- * <parameter name="transport.jms.Destination" locked="false">myDestinationOne, myDestinationTwo</parameter>
- * </parameter>
- * </transportReceiver>
+ *
+ * JMS Connection Factory definitions, allows JNDI properties as well as other service
+ * level parameters to be defined, and re-used by each service that binds to it
+ *
+ * When used for sending messages out, the JMSConnectionFactory'ies are able to cache
+ * a Connection, Session or Producer
  */
-public class JMSConnectionFactory implements ExceptionListener {
+public class JMSConnectionFactory {
 
     private static final Log log = LogFactory.getLog(JMSConnectionFactory.class);
 
     /** The name used for the connection factory definition within Axis2 */
     private String name = null;
-    /** The JMS transport listener instance. */
-    private final JMSListener jmsListener;
-    /** The worker pool to use. */
-    private final WorkerPool workerPool;
-    /** The JNDI name of the actual connection factory */
-    private String connFactoryJNDIName = null;
-    /** Map of destination JNDI names to endpoints */
-    private Map<String,JMSEndpoint> endpointJNDINameMapping = null;
-    /** JMS Sessions currently active. One session for each Destination / Service */
-    private Map<String,Session> jmsSessions = null;
-    /** Properties of the connection factory to acquire the initial context */
-    private Hashtable<String,String> jndiProperties = null;
-    /** The JNDI Context used - created using the properties */
+    /** The list of parameters from the axis2.xml definition */
+    private Hashtable<String, String> parameters = new Hashtable<String, String>();
+
+    /** The cached InitialContext reference */
     private Context context = null;
-    /** The actual ConnectionFactory instance held within */
+    /** The JMS ConnectionFactory this definition refers to */
     private ConnectionFactory conFactory = null;
-    /** The JMS connection factory type */
-    private String connectionFactoryType = null;
-    /** The JMS Connection opened */
-    private Connection connection = null;
-    /** The axis2 configuration context */
-    private ConfigurationContext cfgCtx = null;
-    /** if connection dropped, reconnect timeout in milliseconds; default 30 seconds */
-    private long reconnectTimeout = 30000;
-
-    /**
-     * Create a JMSConnectionFactory for the given [axis2] name the
-     * JNDI name of the actual ConnectionFactory
-     *
-     * @param name the connection factory name specified in the axis2.xml for the
-     * TransportListener or the TransportSender using this
-     * @param jmsListener the JMS transport listener, or null if the connection factory
-     *                    is not linked to a transport listener
-     * @param workerPool the worker pool to be used to process incoming messages; may be null
-     * @param cfgCtx the axis2 configuration context
-     */
-    public JMSConnectionFactory(String name, JMSListener jmsListener, WorkerPool workerPool,
-                                ConfigurationContext cfgCtx) {
-        this.name = name;
-        this.jmsListener = jmsListener;
-        this.workerPool = workerPool;
-        this.cfgCtx = cfgCtx;
-        endpointJNDINameMapping = new HashMap<String,JMSEndpoint>();
-        jndiProperties = new Hashtable<String,String>();
-        jmsSessions = new HashMap<String,Session>();
-    }
-
-
-    /**
-     * Add a listen destination on this connection factory on behalf of the given service
-     *
-     * @param endpoint the {@link JMSEndpoint} object that specifies the destination and
-     *                 the service
-     */
-    public void addDestination(JMSEndpoint endpoint) {
-        String destinationJNDIName = endpoint.getJndiDestinationName();
-        String destinationName = getPhysicalDestinationName(destinationJNDIName);
-
-        if (destinationName == null) {
-            log.warn("JMS Destination with JNDI name : " + destinationJNDIName + " does not exist");
-
-            try {
-                log.info("Creating a JMS Queue with the JNDI name : " + destinationJNDIName +
-                    " using the connection factory definition named : " + name);
-                JMSUtils.createDestination(conFactory, destinationJNDIName, endpoint.getDestinationType());
-
-                destinationName = getPhysicalDestinationName(destinationJNDIName);
-                
-            } catch (JMSException e) {
-                log.error("Unable to create Destination with JNDI name : " + destinationJNDIName, e);
-                BaseUtils.markServiceAsFaulty(
-                    endpoint.getServiceName(),
-                    "Error creating JMS destination : " + destinationJNDIName,
-                    cfgCtx.getAxisConfiguration());
-                return;
-            }
-        }
-
-        endpointJNDINameMapping.put(destinationJNDIName, endpoint);
-
-        log.info("Mapped JNDI name : " + destinationJNDIName + " and JMS Destination name : " +
-            destinationName + " against service : " + endpoint.getServiceName());
-    }
+    /** The shared JMS Connection for this JMS connection factory */
+    private Connection sharedConnection = null;
+    /** The shared JMS Session for this JMS connection factory */
+    private Session sharedSession = null;
+    /** The shared JMS MessageProducer for this JMS connection factory */
+    private MessageProducer sharedProducer = null;
+    /** The Shared Destination */
+    private Destination sharedDestination = null;
+    /** The shared JMS connection for this JMS connection factory */
+    private int cacheLevel = JMSConstants.CACHE_CONNECTION;
 
     /**
-     * Abort listening on the JMS destination from this connection factory
-     *
-     * @param jndiDestinationName the JNDI name of the JMS destination to be removed
+     * Digest a JMS CF definition from an axis2.xml 'Parameter' and construct
+     * @param parameter the axis2.xml 'Parameter' that defined the JMS CF
      */
-    public void removeDestination(String jndiDestinationName) {
-        stoplisteningOnDestination(jndiDestinationName);
-        endpointJNDINameMapping.remove(jndiDestinationName);
-    }
+    public JMSConnectionFactory(Parameter parameter) {
 
-    /**
-     * Begin [or restart] listening for messages on the list of destinations associated
-     * with this connection factory. (Called during Axis2 initialization of
-     * the Transport receivers, or after a disconnection has been detected)
-     *
-     * When called from the JMS transport sender, this call simply acquires the actual
-     * JMS connection factory from the JNDI, creates a new connection and starts it.
-     *
-     * @throws JMSException on exceptions
-     * @throws NamingException on exceptions
-     */
-    public synchronized void connectAndListen() throws JMSException, NamingException {
+        this.name = parameter.getName();
+        ParameterIncludeImpl pi = new ParameterIncludeImpl();
 
-        // if this is a reconnection/re-initialization effort after the detection of a
-        // disconnection, close all sessions and the CF connection and re-initialize
-        if (connection != null) {
-            log.info("Re-initializing the JMS connection factory : " + name);
-
-            for (Session session : jmsSessions.values()) {
-                try {
-                    session.close();
-                } catch (JMSException ignore) {}
-            }
-            try {
-                connection.stop();
-            } catch (JMSException ignore) {}
-
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug("Initializing the JMS connection factory : " + name);
-            }
+        try {
+            pi.deserializeParameters((OMElement) parameter.getValue());
+        } catch (AxisFault axisFault) {
+            handleException("Error reading parameters for JMS connection factory" + name, axisFault);
         }
 
-        // get the CF reference freshly [again] from JNDI
-        context = new InitialContext(jndiProperties);
-        conFactory = JMSUtils.lookup(context, ConnectionFactory.class, connFactoryJNDIName);
-        log.info("Connected to the JMS connection factory : " + connFactoryJNDIName);
+        for (Object o : pi.getParameters()) {
+            Parameter p = (Parameter) o;
+            parameters.put(p.getName(), (String) p.getValue());
+        }
 
+        digestCacheLevel();
         try {
-            connection = JMSUtils.createConnection(conFactory,
-                    jndiProperties.get(Context.SECURITY_PRINCIPAL),
-                    jndiProperties.get(Context.SECURITY_CREDENTIALS),
-                    getConnectionFactoryType());
-            
-            connection.setExceptionListener(this);
-
-        } catch (JMSException e) {
-            handleException("Error connecting to Connection Factory : " + connFactoryJNDIName, e);
-        }
+            context = new InitialContext(parameters);
+            conFactory = JMSUtils.lookup(context, ConnectionFactory.class,
+                parameters.get(JMSConstants.PARAM_CONFAC_JNDI_NAME));
+            if (parameters.get(JMSConstants.PARAM_DESTINATION) != null) {
+                sharedDestination = JMSUtils.lookup(context, Destination.class,
+                    parameters.get(JMSConstants.PARAM_DESTINATION));
+            }
+            log.info("JMS ConnectionFactory : " + name + " initialized");
 
-        for (JMSEndpoint endpoint : endpointJNDINameMapping.values()) {
-            startListeningOnDestination(endpoint);
+        } catch (NamingException e) {
+            throw new AxisJMSException("Cannot acquire JNDI context, JMS Connection factory : " +
+                parameters.get(JMSConstants.PARAM_CONFAC_JNDI_NAME) + " or default destination : " +
+                parameters.get(JMSConstants.PARAM_DESTINATION) +
+                " for JMS CF : " + name + " using : " + parameters);
         }
-
-        connection.start(); // indicate readiness to start receiving messages
-        log.info("Connection factory : " + name + " initialized...");
     }
 
     /**
-     * Create a session for sending to the given destination and save it on the jmsSessions Map
-     * keyed by the destination JNDI name
-     * @param destinationJNDIname the destination JNDI name
-     * @return a JMS Session to send messages to the destination using this connection factory
+     * Digest, the cache value iff specified
      */
-    public Session getSessionForDestination(String destinationJNDIname) {
-
-        Session session = jmsSessions.get(destinationJNDIname);
+    private void digestCacheLevel() {
 
-        if (session == null) {
-            try {                
-                Destination dest = getPhysicalDestination(destinationJNDIname);
+        String key = JMSConstants.PARAM_CACHE_LEVEL;
+        String val = parameters.get(key);
 
-                if (dest instanceof Topic) {
-                    session = ((TopicConnection) connection).
-                        createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
-                } else {
-                    session = ((QueueConnection) connection).
-                        createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
-                }
-
-                jmsSessions.put(destinationJNDIname, session);
-
-            } catch (JMSException e) {
-                handleException("Unable to create a session using connection factory : " + name, e);
-            }
+        if ("none".equalsIgnoreCase(val)) {
+            this.cacheLevel = JMSConstants.CACHE_NONE;
+        } else if ("connection".equalsIgnoreCase(val)) {
+            this.cacheLevel = JMSConstants.CACHE_CONNECTION;
+        } else if ("session".equals(val)){
+            this.cacheLevel = JMSConstants.CACHE_SESSION;
+        } else if ("producer".equals(val)) {
+            this.cacheLevel = JMSConstants.CACHE_PRODUCER;
+        } else if (val != null) {
+            throw new AxisJMSException("Invalid cache level : " + val + " for JMS CF : " + name);
         }
-        return session;
     }
 
     /**
-     * Listen on the given destination from this connection factory. Used to
-     * start listening on a destination associated with a newly deployed service
-     *
-     * @param endpoint the JMS destination to listen on
-     */
-    public void startListeningOnDestination(JMSEndpoint endpoint) {
-        String destinationJNDIname = endpoint.getJndiDestinationName();
-        String destinationType = endpoint.getDestinationType();
-        Session session = jmsSessions.get(destinationJNDIname);
-        // if we already had a session open, close it first
-        if (session != null) {
-            try {
-                session.close();
-            } catch (JMSException ignore) {}
-        }
-
-        try {
-            session = JMSUtils.createSession(connection, false, Session.AUTO_ACKNOWLEDGE, destinationType);
-            Destination destination = null;
-
-            try {
-                destination = JMSUtils.lookup(context, Destination.class, destinationJNDIname);
-
-            } catch (NameNotFoundException e) {
-                log.warn("Cannot find destination : " + destinationJNDIname + ". Creating a Queue");
-                destination = JMSUtils.createDestination(session, destinationJNDIname, destinationType);
-            }
-
-            MessageConsumer consumer = JMSUtils.createConsumer(session, destination);
-            consumer.setMessageListener(new JMSMessageReceiver(jmsListener, this, workerPool,
-                    cfgCtx, endpoint));
-            jmsSessions.put(destinationJNDIname, session);
-
-        // catches NameNotFound and JMSExceptions and marks service as faulty    
-        } catch (Exception e) {
-            if (session != null) {
-                try {
-                    session.close();
-                } catch (JMSException ignore) {}
-            }
-
-            BaseUtils.markServiceAsFaulty(
-                endpoint.getServiceName(),
-                "Error looking up JMS destination : " + destinationJNDIname,
-                cfgCtx.getAxisConfiguration());
-        }
+     * Return the name assigned to this JMS CF definition
+     * @return name of the JMS CF
+     */
+    public String getName() {
+        return name;
     }
 
     /**
-     * Stop listening on the given destination - for undeployment or stopping of services
-     * closes the underlying Session opened to subscribe to the destination
-     *
-     * @param destinationJNDIname the JNDI name of the JMS destination
+     * The list of properties (including JNDI and non-JNDI)
+     * @return properties defined on the JMS CF
      */
-    private void stoplisteningOnDestination(String destinationJNDIname) {
-        Session session = jmsSessions.get(destinationJNDIname);
-        if (session != null) {
-            try {
-                session.close();
-            } catch (JMSException ignore) {}
-        }
+    public Hashtable<String, String> getParameters() {
+        return parameters;
     }
 
-
     /**
-     * Close all connections, sessions etc.. and stop this connection factory
+     * Get cached InitialContext
+     * @return cache InitialContext
      */
-    public void stop() {
-        if (connection != null) {
-            for (Session session : jmsSessions.values()) {
-                try {
-                    session.close();
-                } catch (JMSException ignore) {}
-            }
-            try {
-                connection.close();
-            } catch (JMSException e) {
-                log.warn("Error shutting down connection factory : " + name, e);
-            }
-        }
+    public Context getContext() {
+        return context;
     }
 
     /**
-     * Return the provider specific [physical] Destination name if any
-     * for the destination with the given JNDI name
-     *
-     * @param destinationJndi the JNDI name of the destination
-     * @return the provider specific Destination name or null if cannot be found
-     */
-    private String getPhysicalDestinationName(String destinationJndi) {
-        Destination destination = getPhysicalDestination(destinationJndi);
-
-        if (destination != null) {
-            try {
-                if (destination instanceof Queue) {
-                    return ((Queue) destination).getQueueName();
-                } else if (destination instanceof Topic) {
-                    return ((Topic) destination).getTopicName();
-                }
-            } catch (JMSException e) {
-                log.warn("Error reading Destination name for JNDI destination : " + destinationJndi, e);
-            }
-        }
-        return null;
+     * Cache level applicable for this JMS CF
+     * @return applicable cache level
+     */
+    public int getCacheLevel() {
+        return cacheLevel;
     }
-    
+
     /**
-     * Return the provider specific [physical] Destination if any
-     * for the destination with the given JNDI name
-     *
-     * @param destinationJndi the JNDI name of the destination
-     * @return the provider specific Destination or null if cannot be found
+     * Get the shared Destination - if defined
+     * @return
      */
-    private Destination getPhysicalDestination(String destinationJndi) {
-        Destination destination = null;
+    public Destination getSharedDestination() {
+        return sharedDestination;
+    }
 
+    /**
+     * Lookup a Destination using this JMS CF definitions and JNDI name
+     * @param name JNDI name of the Destionation
+     * @return JMS Destination for the given JNDI name or null
+     */
+    public Destination getDestination(String name) {
         try {
-            destination = JMSUtils.lookup(context, Destination.class, destinationJndi);
+            return JMSUtils.lookup(context, Destination.class, name);
         } catch (NamingException e) {
-
-            // if we are using ActiveMQ, check for dynamic Queues and Topics
-            String provider = jndiProperties.get(Context.INITIAL_CONTEXT_FACTORY);
-            if (provider.indexOf("activemq") != -1) {
-                try {
-                    destination = JMSUtils.lookup(context, Destination.class,
-                        JMSConstants.ACTIVEMQ_DYNAMIC_QUEUE + destinationJndi);
-                } catch (NamingException ne) {
-                    try {
-                        destination = JMSUtils.lookup(context, Destination.class,
-                            JMSConstants.ACTIVEMQ_DYNAMIC_TOPIC + destinationJndi);
-                    } catch (NamingException e1) {
-                        log.warn("Error looking up destination for JNDI name : " + destinationJndi);
-                    }
-                }
-            }
+            handleException("Unknown JMS Destination : " + name + " using : " + parameters, e);
         }
-
-        return destination;
+        return null;
     }
 
     /**
-     * Return the EPR for the JMS Destination with the given JNDI name
-     * when using this connection factory
-     * @param jndiDestination the JNDI name of the JMS destination
-     * @return the EPR for a service using this destination
+     * Get the reply Destination from the PARAM_REPLY_DESTINATION parameter
+     * @return reply destination defined in the JMS CF
      */
-    public EndpointReference getEPRForDestination(String jndiDestination) {
-
-        StringBuffer sb = new StringBuffer();
-        sb.append(JMSConstants.JMS_PREFIX).append(jndiDestination);
-        sb.append("?").
-            append(JMSConstants.CONFAC_JNDI_NAME_PARAM).
-            append("=").append(getConnFactoryJNDIName());
-        for (Map.Entry<String,String> entry : getJndiProperties().entrySet()) {
-            sb.append("&").append(entry.getKey()).append("=").append(entry.getValue());
-        }
-
-        return new EndpointReference(sb.toString());
+    public String getReplyToDestination() {
+        return parameters.get(JMSConstants.PARAM_REPLY_DESTINATION);
     }
 
-    // -------------------- getters and setters and trivial methods --------------------
-
-    public void setConnFactoryJNDIName(String connFactoryJNDIName) {
-        this.connFactoryJNDIName = connFactoryJNDIName;
+    private void handleException(String msg, Exception e) {
+        log.error(msg, e);
+        throw new AxisJMSException(msg, e);
     }
 
-    public Destination getDestination(String destinationJNDIName) {
-        try {
-            return JMSUtils.lookup(context, Destination.class, destinationJNDIName);
-        } catch (NamingException ignore) {}
-        return null;
+    /**
+     * Should the JMS 1.1 API be used? - defaults to yes
+     * @return true, if JMS 1.1 api should  be used
+     */
+    public boolean isJmsSpec11() {
+        return parameters.get(JMSConstants.PARAM_JMS_SPEC_VER) == null ||
+            "1.1".equals(parameters.get(JMSConstants.PARAM_JMS_SPEC_VER));
     }
 
-    public void addJNDIContextProperty(String key, String value) {
-        jndiProperties.put(key, value);
-    }
+    /**
+     * Return the type of the JMS CF Destination
+     * @return TRUE if a Queue, FALSE for a Topic and NULL for a JMS 1.1 Generic Destination
+     */
+    public Boolean isQueue() {
+        if (parameters.get(JMSConstants.PARAM_CONFAC_TYPE) == null &&
+            parameters.get(JMSConstants.PARAM_DEST_TYPE) == null) {
+            return null;
+        }
 
-    public String getName() {
-        return name;
+        if (parameters.get(JMSConstants.PARAM_CONFAC_TYPE) != null) {
+            if ("queue".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_CONFAC_TYPE))) {
+                return true;
+            } else if ("topic".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_CONFAC_TYPE))) {
+                return false;
+            } else {
+                throw new AxisJMSException("Invalid " + JMSConstants.PARAM_CONFAC_TYPE + " : " +
+                    parameters.get(JMSConstants.PARAM_CONFAC_TYPE) + " for JMS CF : " + name);
+            }
+        } else {
+            if ("queue".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_DEST_TYPE))) {
+                return true;
+            } else if ("topic".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_DEST_TYPE))) {
+                return false;
+            } else {
+                throw new AxisJMSException("Invalid " + JMSConstants.PARAM_DEST_TYPE + " : " +
+                    parameters.get(JMSConstants.PARAM_DEST_TYPE) + " for JMS CF : " + name);
+            }
+        }
     }
 
-    public String getConnFactoryJNDIName() {
-        return connFactoryJNDIName;
+    /**
+     * Is a session transaction requested from users of this JMS CF?
+     * @return session transaction required by the clients of this?
+     */
+    private boolean isSessionTransacted() {
+        return parameters.get(JMSConstants.PARAM_SESSION_TRANSACTED) == null ||
+            Boolean.valueOf(parameters.get(JMSConstants.PARAM_SESSION_TRANSACTED));
     }
 
-    public ConnectionFactory getConFactory() {
-        return conFactory;
-    }
+    /**
+     * Create a new Connection
+     * @return a new Connection
+     */
+    private Connection createConnection() {
 
-    public Hashtable<String,String> getJndiProperties() {
-        return jndiProperties;
-    }
+        Connection connection = null;
+        try {
+            connection = JMSUtils.createConnection(
+                conFactory,
+                parameters.get(JMSConstants.PARAM_JMS_USERNAME),
+                parameters.get(JMSConstants.PARAM_JMS_PASSWORD),
+                isJmsSpec11(), isQueue());
 
-    public Context getContext() {
-        return context;
-    }
+            if (log.isDebugEnabled()) {
+                log.debug("New JMS Connection from JMS CF : " + name + " created");
+            }
 
-    private void handleException(String msg, Exception e) throws AxisJMSException {
-        log.error(msg, e);
-        throw new AxisJMSException(msg, e);
+        } catch (JMSException e) {
+            handleException("Error acquiring a Connection from the JMS CF : " + name +
+                " using properties : " + parameters, e);
+        }
+        return connection;
     }
 
-    public String getConnectionFactoryType() {
-      return connectionFactoryType;
-    }
+    /**
+     * Create a new Session
+     * @param connection Connection to use
+     * @return A new Session
+     */
+    private Session createSession(Connection connection) {
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug("Creating a new JMS Session from JMS CF : " + name);
+            }
+            return JMSUtils.createSession(
+                connection, isSessionTransacted(), Session.AUTO_ACKNOWLEDGE, isJmsSpec11(), isQueue());
 
-    public void setConnectionFactoryType(String connectionFactoryType) {
-      this.connectionFactoryType = connectionFactoryType;
-    }
-    
-    public long getReconnectTimeout() {
-      return reconnectTimeout;
+        } catch (JMSException e) {
+            handleException("Error creating JMS session from JMS CF : " + name, e);
+        }
+        return null;
     }
 
-    public void setReconnectTimeout(long reconnectTimeout) {
-      this.reconnectTimeout = reconnectTimeout;
-    }
+    /**
+     * Create a new MessageProducer
+     * @param session Session to be used
+     * @param destination Destination to be used
+     * @return a new MessageProducer
+     */
+    private MessageProducer createProducer(Session session, Destination destination) {
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug("Creating a new JMS MessageProducer from JMS CF : " + name);
+            }
 
-    public void onException(JMSException e) {
-        log.error("JMS connection factory " + name + " encountered an error", e);
-        boolean wasError = true;
+            return JMSUtils.createProducer(
+                session, destination, isQueue(), isJmsSpec11());
 
-        if (jmsListener != null) {
-            jmsListener.error(null, e);
+        } catch (JMSException e) {
+            handleException("Error creating JMS producer from JMS CF : " + name,e);
         }
+        return null;
+    }
 
-        // try to connect
-        // if error occurs wait and try again
-        while (wasError == true) {
+    /**
+     * Get a new Connection or shared Connection from this JMS CF
+     * @return new or shared Connection from this JMS CF
+     */
+    public Connection getConnection() {
+        if (cacheLevel > JMSConstants.CACHE_NONE) {
+            return getSharedConnection();
+        } else {
+            return createConnection();
+        }
+    }
 
-            try {
-                connectAndListen();
-                wasError = false;
+    /**
+     * Get a new Session or shared Session from this JMS CF
+     * @param connection the Connection to be used
+     * @return new or shared Session from this JMS CF
+     */
+    public Session getSession(Connection connection) {
+        if (cacheLevel > JMSConstants.CACHE_CONNECTION) {
+            return getSharedSession();
+        } else {
+            return createSession((connection == null ? getConnection() : connection));
+        }
+    }
 
-            } catch (Exception e1) {
-                log.warn("JMS reconnection attempt failed for connection factory : " + name, e);
-            }
+    /**
+     * Get a new MessageProducer or shared MessageProducer from this JMS CF
+     * @param connection the Connection to be used
+     * @param session the Session to be used
+     * @param destination the Destination to bind MessageProducer to
+     * @return new or shared MessageProducer from this JMS CF
+     */
+    public MessageProducer getMessageProducer(
+        Connection connection, Session session, Destination destination) {
+        if (cacheLevel > JMSConstants.CACHE_SESSION) {
+            return getSharedProducer();
+        } else {
+            return createProducer((session == null ? getSession(connection) : session), destination);
+        }
+    }
 
-            if (wasError == true) {
-                try {
-                    log.info("Attempting reconnection for connection factory " + name +
-                        " in " + getReconnectTimeout()/1000 +  " seconds");
-                    Thread.sleep(getReconnectTimeout());
-                } catch (InterruptedException ignore) {}
+    /**
+     * Get a new Connection or shared Connection from this JMS CF
+     * @return new or shared Connection from this JMS CF
+     */
+    private Connection getSharedConnection() {
+        if  (sharedConnection == null) {
+            sharedConnection = createConnection();
+            if (log.isDebugEnabled()) {
+                log.debug("Created shared JMS Connection for JMS CF : " + name);
             }
-        } // wasError
-
+        }
+        return sharedConnection;
     }
 
     /**
-     * Temporarily pause receiving new messages
+     * Get a shared Session from this JMS CF
+     * @return shared Session from this JMS CF
      */
-    public void pause() {
-        try {
-            connection.stop();
-        } catch (JMSException e) {
-            handleException("Error pausing JMS connection for factory : " + name, e);
+    private Session getSharedSession() {
+        if (sharedSession == null) {
+            sharedSession = createSession(getSharedConnection());
+            if (log.isDebugEnabled()) {
+                log.debug("Created shared JMS Session for JMS CF : " + name);
+            }
         }
+        return sharedSession;
     }
 
     /**
-     * Resume from temporarily pause
+     * Get a shared MessageProducer from this JMS CF
+     * @return shared MessageProducer from this JMS CF
      */
-    public void resume() {
-        try {
-            connection.start();
-        } catch (JMSException e) {
-            handleException("Error resuming JMS connection for factory : " + name, e);
+    private MessageProducer getSharedProducer() {
+        if (sharedProducer == null) {
+            sharedProducer = createProducer(getSharedSession(), sharedDestination);
+            if (log.isDebugEnabled()) {
+                log.debug("Created shared JMS MessageConsumer for JMS CF : " + name);
+            }
         }
+        return sharedProducer;
     }
 }

Modified: webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactoryManager.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactoryManager.java?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactoryManager.java (original)
+++ webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactoryManager.java Mon Dec  8 10:15:40 2008
@@ -36,80 +36,43 @@
  * Class managing a set of {@link JMSConnectionFactory} objects.
  */
 public class JMSConnectionFactoryManager {
+
     private static final Log log = LogFactory.getLog(JMSConnectionFactoryManager.class);
-    
+
     /** A Map containing the JMS connection factories managed by this, keyed by name */
-    private final Map<String,JMSConnectionFactory> connectionFactories = new HashMap<String,JMSConnectionFactory>();
-    
-    private final ConfigurationContext cfgCtx;
-    
-    private final JMSListener jmsListener;
-    
-    private final WorkerPool workerPool;
-    
-    public JMSConnectionFactoryManager(ConfigurationContext cfgCtx) {
-        this.cfgCtx = cfgCtx;
-        jmsListener = null;
-        workerPool = null;
-    }
-    
-    public JMSConnectionFactoryManager(ConfigurationContext cfgCtx, JMSListener jmsListener, WorkerPool workerPool) {
-        this.cfgCtx = cfgCtx;
-        this.jmsListener = jmsListener;
-        this.workerPool = workerPool;
+    private final Map<String,JMSConnectionFactory> connectionFactories =
+        new HashMap<String,JMSConnectionFactory>();
+
+    /**
+     * Construct a Connection factory manager for the JMS transport sender or receiver
+     * @param trpInDesc
+     */
+    public JMSConnectionFactoryManager(ParameterInclude trpInDesc) {
+        loadConnectionFactoryDefinitions(trpInDesc);
     }
-    
+
     /**
      * Create JMSConnectionFactory instances for the definitions in the transport configuration,
      * and add these into our collection of connectionFactories map keyed by name
      *
      * @param trpDesc the transport description for JMS
      */
-    public void loadConnectionFactoryDefinitions(ParameterInclude trpDesc) {
-
-        // iterate through all defined connection factories
-        Iterator<?> conFacIter = trpDesc.getParameters().iterator();
-
-        while (conFacIter.hasNext()) {
-            Parameter conFacParams = (Parameter) conFacIter.next();
-
-            JMSConnectionFactory jmsConFactory =
-                new JMSConnectionFactory(conFacParams.getName(), jmsListener, workerPool, cfgCtx);
-            JMSUtils.setConnectionFactoryParameters(conFacParams, jmsConFactory);
+    private void loadConnectionFactoryDefinitions(ParameterInclude trpDesc) {
 
-            connectionFactories.put(jmsConFactory.getName(), jmsConFactory);
-        }
-    }
-    
-    /**
-     * Get the names of the defined connection factories.
-     * @return
-     */
-    public String[] getNames() {
-        Collection<String> result = connectionFactories.keySet();
-        return result.toArray(new String[result.size()]);
-    }
-    
-    /**
-     * Start all connection factories.
-     * 
-     * @throws AxisFault
-     */
-    public void start() throws AxisFault {
-        for (JMSConnectionFactory conFac : connectionFactories.values()) {
+        for (Object o : trpDesc.getParameters()) {
+            JMSConnectionFactory jmsConFactory = null;
             try {
-                conFac.connectAndListen();
-            } catch (JMSException e) {
-                handleException("Error starting connection factory : " + conFac.getName(), e);
-            } catch (NamingException e) {
-                handleException("Error starting connection factory : " + conFac.getName(), e);
+                jmsConFactory = new JMSConnectionFactory((Parameter) o);
+                connectionFactories.put(jmsConFactory.getName(), jmsConFactory);
+            } catch (AxisJMSException e) {
+                log.error("Error setting up connection factory : " + jmsConFactory.getName(), e);
             }
         }
     }
 
     /**
      * Get the JMS connection factory with the given name.
-     * 
+     *
      * @param name the name of the JMS connection factory
      * @return the JMS connection factory or null if no connection factory with
      *         the given name exists
@@ -117,72 +80,45 @@
     public JMSConnectionFactory getJMSConnectionFactory(String name) {
         return connectionFactories.get(name);
     }
-    
+
     /**
      * Get the JMS connection factory that matches the given properties, i.e. referring to
-     * the same underlying connection factory.
-     * 
-     * @param props
+     * the same underlying connection factory. Used by the JMSSender to determine if already
+     * available resources should be used for outgoing messages
+     *
+     * @param props a Map of connection factory JNDI properties and name
      * @return the JMS connection factory or null if no connection factory compatible
      *         with the given properties exists
      */
     public JMSConnectionFactory getJMSConnectionFactory(Map<String,String> props) {
         for (JMSConnectionFactory cf : connectionFactories.values()) {
-            Map<String,String> jndiProperties = cf.getJndiProperties();
-            if (equals(props.get(JMSConstants.CONFAC_JNDI_NAME_PARAM), jndiProperties.get(JMSConstants.CONFAC_JNDI_NAME_PARAM))
+            Map<String,String> cfProperties = cf.getParameters();
+
+            if (equals(props.get(JMSConstants.PARAM_CONFAC_JNDI_NAME),
+                cfProperties.get(JMSConstants.PARAM_CONFAC_JNDI_NAME))
                 &&
-                equals(props.get(Context.INITIAL_CONTEXT_FACTORY), jndiProperties.get(Context.INITIAL_CONTEXT_FACTORY))
+                equals(props.get(Context.INITIAL_CONTEXT_FACTORY),
+                    cfProperties.get(Context.INITIAL_CONTEXT_FACTORY))
                 &&
-                equals(props.get(Context.PROVIDER_URL), jndiProperties.get(Context.PROVIDER_URL))
+                equals(props.get(Context.PROVIDER_URL),
+                    cfProperties.get(Context.PROVIDER_URL))
                 &&
-                equals(props.get(Context.SECURITY_PRINCIPAL), jndiProperties.get(Context.SECURITY_PRINCIPAL))
+                equals(props.get(Context.SECURITY_PRINCIPAL),
+                    cfProperties.get(Context.SECURITY_PRINCIPAL))
                 &&
-                equals(props.get(Context.SECURITY_CREDENTIALS), jndiProperties.get(Context.SECURITY_CREDENTIALS))) {
+                equals(props.get(Context.SECURITY_CREDENTIALS),
+                    cfProperties.get(Context.SECURITY_CREDENTIALS))) {
                 return cf;
             }
         }
         return null;
     }
-    
-    /**
-     *     Prevents NullPointerException when s1 is null.
-     *     If both values are null this returns true 
-     */
-    private static boolean equals(Object s1, Object s2) {
-        if(s1 == s2) {
-            return true;
-        } else if(s1 != null && s1.equals(s2)) {
-            return true;
-        } else {
-            return false;
-        }
-    }
 
     /**
-     * Pause all connection factories.
+     * Compare two values preventing NPEs
      */
-    public void pause() {
-        for (JMSConnectionFactory conFac : connectionFactories.values()) {
-            conFac.pause();
-        }
-    }
-    
-    /**
-     * Resume all connection factories.
-     */
-    public void resume() {
-        for (JMSConnectionFactory conFac : connectionFactories.values()) {
-            conFac.resume();
-        }
-    }
-    
-    /**
-     * Stop all connection factories.
-     */
-    public void stop() {
-        for (JMSConnectionFactory conFac : connectionFactories.values()) {
-            conFac.stop();
-        }
+    private static boolean equals(Object s1, Object s2) {
+        return s1 == s2 || s1 != null && s1.equals(s2);
     }
 
     protected void handleException(String msg, Exception e) throws AxisFault {

Modified: webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConstants.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConstants.java?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConstants.java (original)
+++ webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConstants.java Mon Dec  8 10:15:40 2008
@@ -24,10 +24,7 @@
      */
     public static final String JMS_PREFIX = "jms:/";
 
-    public static final String ACTIVEMQ_DYNAMIC_QUEUE = "dynamicQueues/";
-    public static final String ACTIVEMQ_DYNAMIC_TOPIC = "dynamicTopics/";
-
-    //------------------------------------ defaults ------------------------------------
+    //------------------------------------ defaults / constants ------------------------------------
     /**
      * The local (Axis2) JMS connection factory name of the default connection
      * factory to be used, if a service does not explicitly state the connection
@@ -35,116 +32,234 @@
      */
     public static final String DEFAULT_CONFAC_NAME = "default";
     /**
-     * The default JMS time out waiting for a reply
+     * The default JMS time out waiting for a reply - also see {@link JMS_WAIT_REPLY}
      */
     public static final long DEFAULT_JMS_TIMEOUT = Options.DEFAULT_TIMEOUT_MILLISECONDS;
-
-    //-------------------------- services.xml parameters --------------------------------
     /**
-     * The Parameter name indicating a JMS destination for requests
+     * Value indicating a Queue used for {@link DEST_PARAM_TYPE}, {@link REPLY_PARAM_TYPE}
      */
-    public static final String DEST_PARAM = "transport.jms.Destination";
+    public static final String DESTINATION_TYPE_QUEUE = "queue";
     /**
-     * The Parameter name indicating a JMS destination type for requests. i.e. DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC
+     * Value indicating a Topic used for {@link DEST_PARAM_TYPE}, {@link REPLY_PARAM_TYPE}
      */
-    public static final String DEST_PARAM_TYPE = "transport.jms.DestinationType";
+    public static final String DESTINATION_TYPE_TOPIC = "topic";
     /**
-     * The Parameter name indicating the response JMS destination
+     * Value indicating a JMS 1.1 Generic Destination used by {@link DEST_PARAM_TYPE}, {@link REPLY_PARAM_TYPE}
      */
-    public static final String REPLY_PARAM = "transport.jms.ReplyDestination";
+    public static final String DESTINATION_TYPE_GENERIC = "generic";
+
+    /** Do not cache any JMS resources between tasks (when sending) or JMS CF's (when sending) */
+	public static final int CACHE_NONE = 0;
+	/** Cache only the JMS connection between tasks (when receiving), or JMS CF's (when sending)*/
+	public static final int CACHE_CONNECTION = 1;
+	/** Cache only the JMS connection and Session between tasks (receiving), or JMS CF's (sending) */
+	public static final int CACHE_SESSION = 2;
+	/** Cache the JMS connection, Session and Consumer between tasks when receiving*/
+	public static final int CACHE_CONSUMER = 3;
+	/** Cache the JMS connection, Session and Producer within a JMSConnectionFactory when sending */
+	public static final int CACHE_PRODUCER = 4;
+    /** automatic choice of an appropriate caching level (depending on the transaction strategy) */
+	public static final int CACHE_AUTO = 5;
+
+    /** A JMS 1.1 Generic Destination type or ConnectionFactory */
+    public static final int GENERIC = 0;
+    /** A Queue Destination type or ConnectionFactory */
+    public static final int QUEUE = 1;
+    /** A Topic Destination type or ConnectionFactory */
+    public static final int TOPIC = 2;
+
     /**
-     * The Parameter name indicating the response JMS destination. i.e. DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC
+     * The EPR parameter name indicating the name of the message level property that indicated the content type.
      */
-    public static final String REPLY_PARAM_TYPE = "transport.jms.ReplyDestinationType";
+    public static final String CONTENT_TYPE_PROPERTY_PARAM = "transport.jms.ContentTypeProperty";
+
+    //---------------------------------- services.xml parameters -----------------------------------
     /**
-     * The EPR parameter name indicating the message property to use to store the content type.
+     * The Service level Parameter name indicating the JMS destination for requests of a service
      */
-    public static final String CONTENT_TYPE_PROPERTY_PARAM = "transport.jms.ContentTypeProperty";
-    
+    public static final String PARAM_DESTINATION = "transport.jms.Destination";
     /**
-     * Values used for DEST_PARAM_TYPE, REPLY_PARAM_TYPE
+     * The Service level Parameter name indicating the destination type for requests.
+     * also see {@link DESTINATION_TYPE_QUEUE}, {@link DESTINATION_TYPE_TOPIC}
      */
-    public static final String DESTINATION_TYPE_QUEUE = "queue";
-    public static final String DESTINATION_TYPE_TOPIC = "topic";
-
+    public static final String PARAM_DEST_TYPE = "transport.jms.DestinationType";
+    /**
+     * The Service level Parameter name indicating the [default] response destination of a service
+     */
+    public static final String PARAM_REPLY_DESTINATION = "transport.jms.ReplyDestination";
+    /**
+     * The Service level Parameter name indicating the response destination type
+     * also see {@link DESTINATION_TYPE_QUEUE}, {@link DESTINATION_TYPE_TOPIC}
+     */
+    public static final String PARAM_REPLY_DEST_TYPE = "transport.jms.ReplyDestinationType";
     /**
      * The Parameter name of an Axis2 service, indicating the JMS connection
      * factory which should be used to listen for messages for it. This is
      * the local (Axis2) name of the connection factory and not the JNDI name
      */
-    public static final String CONFAC_PARAM = "transport.jms.ConnectionFactory";
-    /**
-     * If reconnect timeout if connection error occurs in seconds
-     */
-    public static final String RECONNECT_TIMEOUT = "transport.jms.ReconnectTimeout";
+    public static final String PARAM_JMS_CONFAC = "transport.jms.ConnectionFactory";
     /**
      * Connection factory type if using JMS 1.0, either DESTINATION_TYPE_QUEUE or DESTINATION_TYPE_TOPIC
      */
-    public static final String CONFAC_TYPE = "transport.jms.ConnectionFactoryType";
+    public static final String PARAM_CONFAC_TYPE = "transport.jms.ConnectionFactoryType";
     /**
      * The Parameter name indicating the JMS connection factory JNDI name
      */
-    public static final String CONFAC_JNDI_NAME_PARAM = "transport.jms.ConnectionFactoryJNDIName";
+    public static final String PARAM_CONFAC_JNDI_NAME = "transport.jms.ConnectionFactoryJNDIName";
     /**
-     * The parameter indicating the expected content type for messages received by the service.
+     * The Parameter indicating the expected content type for messages received by the service.
      */
     public static final String CONTENT_TYPE_PARAM = "transport.jms.ContentType";
+    /**
+     * The Parameter indicating a final EPR as a String, to be published on the WSDL of a service
+     * Could occur more than once, and could provide additional connection properties or a subset
+     * of the properties auto computed. Also could replace IP addresses with hostnames, and expose
+     * public credentials clients. If a user specified this parameter, the auto generated EPR will
+     * not be exposed - unless an instance of this parameter is added with the string "legacy"
+     * This parameter could be used to expose EPR's conforming to the proposed SOAP/JMS spec
+     * until such time full support is implemented for it.
+     */
+    public static final String PARAM_PUBLISH_EPR = "transport.jms.PublishEPR";
+    /** The parameter indicating the JMS API specification to be used - if this is "1.1" the JMS
+     * 1.1 API would be used, else the JMS 1.0.2B
+     */
+    public static final String PARAM_JMS_SPEC_VER = "transport.jms.JMSSpecVersion";
 
-    //------------ message context / transport header properties and client options ------------
     /**
-     * A MessageContext property or client Option stating the JMS message type
+     * The Parameter indicating whether the JMS Session should be transacted for the service
+     * Specified as a "true" or "false"
+     */
+    public static final String PARAM_SESSION_TRANSACTED = "transport.jms.SessionTransacted";
+    /**
+     * The Parameter indicating the Session acknowledgement for the service. Must be one of the
+     * following Strings, or the appropriate Integer used by the JMS API
+     * "AUTO_ACKNOWLEDGE", "CLIENT_ACKNOWLEDGE", "DUPS_OK_ACKNOWLEDGE" or "SESSION_TRANSACTED"
+     */
+    public static final String PARAM_SESSION_ACK = "transport.jms.SessionAcknowledgement";
+    /** A message selector to be used when messages are sought for this service */
+    public static final String PARAM_MSG_SELECTOR = "transport.jms.MessageSelector";
+    /** Is the Subscription durable ? - "true" or "false" See {@link PARAM_DURABLE_SUB_NAME} */
+    public static final String PARAM_SUB_DURABLE = "transport.jms.SubscriptionDurable";
+    /** The name for the durable subscription See {@link PARAM_SUB_DURABLE}*/
+    public static final String PARAM_DURABLE_SUB_NAME = "transport.jms.DurableSubscriberName";
+    /**
+     * JMS Resource cachable level to be used for the service One of the following:
+     * {@link CACHE_NONE}, {@link CACHE_CONNECTION}, {@link CACHE_SESSION}, {@link CACHE_PRODUCER},
+     * {@link CACHE_CONSUMER}, or {@link CACHE_AUTO} - to let the transport decide
+     */
+    public static final String PARAM_CACHE_LEVEL = "transport.jms.CacheLevel";
+    /** Should a pub-sub connection receive messages published by itself? */
+    public static final String PARAM_PUBSUB_NO_LOCAL = "transport.jms.PubSubNoLocal";
+    /**
+     * The number of milliseconds to wait for a message on a consumer.receive() call
+     * negative number - wait forever
+     * 0 - do not wait at all
+     * positive number - indicates the number of milliseconds to wait
+     */
+    public static final String PARAM_RCV_TIMEOUT = "transport.jms.ReceiveTimeout";
+    /**
+     *The number of concurrent consumers to be created to poll for messages for this service
+     * For Topics, this should be ONE, to prevent receipt of multiple copies of the same message
+     */
+    public static final String PARAM_CONCURRENT_CONSUMERS = "transport.jms.ConcurrentConsumers";
+    /**
+     * The maximum number of concurrent consumers for the service - See {@link PARAM_CONCURRENT_CONSUMERS}
+     */
+    public static final String PARAM_MAX_CONSUMERS = "transport.jms.MaxConcurrentConsumers";
+    /**
+     * The number of idle (i.e. message-less) polling attempts before a worker task commits suicide,
+     * to scale down resources, as load decreases
+     */
+    public static final String PARAM_IDLE_TASK_LIMIT = "transport.jms.IdleTaskLimit";
+    /**
+     * The maximum number of messages a polling worker task should process, before suicide - to
+     * prevent many longer running threads - default is unlimited (i.e. a worker task will live forever)
+     */
+    public static final String PARAM_MAX_MSGS_PER_TASK = "transport.jms.MaxMessagesPerTask";
+    /**
+     * Number of milliseconds before the first reconnection attempt is tried, on detection of an
+     * error. Subsequent retries follow a geometric series, where the
+     * duration = previous duration * factor
+     * This is further limited by the {@link PARAM_RECON_MAX_DURATION} to be meaningful
+     */
+    public static final String PARAM_RECON_INIT_DURATION = "transport.jms.InitialReconnectDuration";
+    /** @see PARAM_RECON_INIT_DURATION */
+    public static final String PARAM_RECON_FACTOR = "transport.jms.ReconnectProgressFactor";
+    /** @see PARAM_RECON_INIT_DURATION */
+    public static final String PARAM_RECON_MAX_DURATION = "transport.jms.MaxReconnectDuration";
+
+    /** The username to use when obtaining a JMS Connection */
+    public static final String PARAM_JMS_USERNAME = "transport.jms.UserName";
+    /** The password to use when obtaining a JMS Connection */
+    public static final String PARAM_JMS_PASSWORD = "transport.jms.Password";
+
+    //-------------- message context / transport header properties and client options --------------
+    /**
+     * A MessageContext property or client Option indicating the JMS message type
      */
     public static final String JMS_MESSAGE_TYPE = "JMS_MESSAGE_TYPE";
     /**
-     * The message type indicating a BytesMessage. See JMS_MESSAGE_TYPE
+     * The message type indicating a BytesMessage. See {@link JMS_MESSAGE_TYPE}
      */
     public static final String JMS_BYTE_MESSAGE = "JMS_BYTE_MESSAGE";
     /**
-     * The message type indicating a TextMessage. See JMS_MESSAGE_TYPE
+     * The message type indicating a TextMessage. See {@link JMS_MESSAGE_TYPE}
      */
     public static final String JMS_TEXT_MESSAGE = "JMS_TEXT_MESSAGE";
     /**
-     * A MessageContext property or client Option stating the time to wait for a response JMS message
+     * A MessageContext property or client Option indicating the time to wait for a response JMS message
      */
     public static final String JMS_WAIT_REPLY = "JMS_WAIT_REPLY";
     /**
-     * A MessageContext property or client Option stating the JMS correlation id
+     * A MessageContext property or client Option indicating the JMS correlation id
      */
     public static final String JMS_COORELATION_ID = "JMS_COORELATION_ID";
     /**
-     * A MessageContext property or client Option stating the JMS message id
+     * A MessageContext property or client Option indicating the JMS message id
      */
     public static final String JMS_MESSAGE_ID = "JMS_MESSAGE_ID";
     /**
-     * A MessageContext property or client Option stating the JMS delivery mode
+     * A MessageContext property or client Option indicating the JMS delivery mode as an Integer or String
+     * Value 1 - javax.jms.DeliveryMode.NON_PERSISTENT
+     * Value 2 - javax.jms.DeliveryMode.PERSISTENT
      */
     public static final String JMS_DELIVERY_MODE = "JMS_DELIVERY_MODE";
     /**
-     * A MessageContext property or client Option stating the JMS destination
+     * A MessageContext property or client Option indicating the JMS destination to use on a Send
      */
     public static final String JMS_DESTINATION = "JMS_DESTINATION";
     /**
-     * A MessageContext property or client Option stating the JMS expiration
+     * A MessageContext property or client Option indicating the JMS message expiration - a Long value
+     * specified as a String
      */
     public static final String JMS_EXPIRATION = "JMS_EXPIRATION";
     /**
-     * A MessageContext property or client Option stating the JMS priority
-     */
-    public static final String JMS_PRIORITY = "JMS_PRIORITY";
-    /**
-     * A MessageContext property stating if the message is a redelivery
+     * A MessageContext property indicating if the message is a redelivery (Boolean as a String)
      */
     public static final String JMS_REDELIVERED = "JMS_REDELIVERED";
     /**
-     * A MessageContext property or client Option stating the JMS replyTo
+     * A MessageContext property or client Option indicating the JMS replyTo Destination
      */
     public static final String JMS_REPLY_TO = "JMS_REPLY_TO";
     /**
-     * A MessageContext property or client Option stating the JMS timestamp
+     * A MessageContext property or client Option indicating the JMS replyTo Destination type
+     * See {@link DESTINATION_TYPE_QUEUE} and {@link DESTINATION_TYPE_TOPIC}
+     */
+    public static final String JMS_REPLY_TO_TYPE = "JMS_REPLY_TO_TYPE";
+    /**
+     * A MessageContext property or client Option indicating the JMS timestamp (Long specified as String)
      */
     public static final String JMS_TIMESTAMP = "JMS_TIMESTAMP";
     /**
-     * A MessageContext property or client Option stating the JMS type
+     * A MessageContext property indicating the JMS type String returned by {@link javax.jms.Message.getJMSType()}
      */
     public static final String JMS_TYPE = "JMS_TYPE";
+    /**
+     * A MessageContext property or client Option indicating the JMS priority
+     */
+    public static final String JMS_PRIORITY = "JMS_PRIORITY";
+    /**
+     * A MessageContext property or client Option indicating the JMS time to live for message sent
+     */
+    public static final String JMS_TIME_TO_LIVE = "JMS_TIME_TO_LIVE";
 }

Modified: webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSEndpoint.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSEndpoint.java?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSEndpoint.java (original)
+++ webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSEndpoint.java Mon Dec  8 10:15:40 2008
@@ -16,7 +16,14 @@
 package org.apache.axis2.transport.jms;
 
 import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.Parameter;
 import org.apache.axis2.transport.jms.ctype.ContentTypeRuleSet;
+import org.apache.axis2.addressing.EndpointReference;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.HashSet;
 
 /**
  * Class that links an Axis2 service to a JMS destination. Additionally, it contains
@@ -24,10 +31,11 @@
  * into Axis2.
  */
 public class JMSEndpoint {
+    private JMSConnectionFactory cf;
     private AxisService service;
     private String jndiDestinationName;
-    private String destinationType;
-    private String endpointReference;
+    private int destinationType = JMSConstants.GENERIC;
+    private Set<EndpointReference> endpointReferences = new HashSet<EndpointReference>();
     private ContentTypeRuleSet contentTypeRuleSet;
 
     public AxisService getService() {
@@ -50,20 +58,39 @@
         this.jndiDestinationName = destinationJNDIName;
     }
 
-    public String getDestinationType() {
-        return destinationType;
-    }
-
     public void setDestinationType(String destinationType) {
-        this.destinationType = destinationType;
-    }
-
-    public String getEndpointReference() {
-        return endpointReference;
-    }
-
-    public void setEndpointReference(String endpointReference) {
-        this.endpointReference = endpointReference;
+        if (JMSConstants.DESTINATION_TYPE_TOPIC.equalsIgnoreCase(destinationType)) {
+            this.destinationType = JMSConstants.TOPIC;
+        } else if (JMSConstants.DESTINATION_TYPE_QUEUE.equalsIgnoreCase(destinationType)) {
+            this.destinationType = JMSConstants.QUEUE;
+        } else {
+            this.destinationType = JMSConstants.GENERIC;
+        }
+    }
+
+    public EndpointReference[] getEndpointReferences() {
+        return endpointReferences.toArray(new EndpointReference[endpointReferences.size()]);
+    }
+
+    public void computeEPRs() {
+        List<EndpointReference> eprs = new ArrayList<EndpointReference>();
+        for (Object o : getService().getParameters()) {
+            Parameter p = (Parameter) o;
+            if (JMSConstants.PARAM_PUBLISH_EPR.equals(p.getName()) && p.getValue() instanceof String) {
+                if ("legacy".equalsIgnoreCase((String) p.getValue())) {
+                    // if "legacy" specified, compute and replace it
+                    endpointReferences.add(
+                        new EndpointReference(JMSUtils.getEPR(cf, destinationType, this)));
+                } else {
+                    endpointReferences.add(new EndpointReference((String) p.getValue()));
+                }
+            }
+        }
+
+        if (eprs.isEmpty()) {
+            // if nothing specified, compute and return legacy EPR
+            endpointReferences.add(new EndpointReference(JMSUtils.getEPR(cf, destinationType, this)));
+        }
     }
 
     public ContentTypeRuleSet getContentTypeRuleSet() {
@@ -73,4 +100,12 @@
     public void setContentTypeRuleSet(ContentTypeRuleSet contentTypeRuleSet) {
         this.contentTypeRuleSet = contentTypeRuleSet;
     }
+
+    public JMSConnectionFactory getCf() {
+        return cf;
+    }
+
+    public void setCf(JMSConnectionFactory cf) {
+        this.cf = cf;
+    }
 }

Modified: webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java (original)
+++ webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java Mon Dec  8 10:15:40 2008
@@ -24,7 +24,6 @@
 import org.apache.axis2.description.TransportInDescription;
 import org.apache.axis2.transport.base.AbstractTransportListener;
 import org.apache.axis2.transport.base.BaseConstants;
-import org.apache.axis2.transport.base.BaseUtils;
 import org.apache.axis2.transport.base.ManagementSupport;
 import org.apache.axis2.transport.base.event.TransportErrorListener;
 import org.apache.axis2.transport.base.event.TransportErrorSource;
@@ -41,88 +40,57 @@
 import javax.jms.TextMessage;
 
 /**
- * The JMS Transport listener implementation. A JMS Listner will hold one or
- * more JMS connection factories, which would be created at initialization
- * time. This implementation does not support the creation of connection
- * factories at runtime. This JMS Listener registers with Axis to be notified
- * of service deployment/undeployment/start and stop, and enables or disables
- * listening for messages on the destinations as appropriate.
- * <p/>
- * A Service could state the JMS connection factory name and the destination
- * name for use as Parameters in its services.xml as shown in the example
- * below. If the connection name was not specified, it will use the connection
- * factory named "default" (JMSConstants.DEFAULT_CONFAC_NAME) - if such a
- * factory is defined in the Axis2.xml. If the destination name is not specified
- * it will default to a JMS queue by the name of the service. If the destination
- * should be a Topic, it should be created on the JMS implementation, and
- * specified in the services.xml of the service.
- * <p/>
- * <parameter name="transport.jms.ConnectionFactory" locked="true">
- * myTopicConnectionFactory</parameter>
- * <parameter name="transport.jms.Destination" locked="true">
- * dynamicTopics/something.TestTopic</parameter>
+ * The revamped JMS Transport listener implementation. Creates {@link ServiceTaskManager} instances
+ * for each service requesting exposure over JMS, and stops these if they are undeployed / stopped.
+ * <p>
+ * A service indicates a JMS Connection factory definition by name, which would be defined in the
+ * JMSListner on the axis2.xml, and this provides a way to reuse common configuration between
+ * services, as well as to optimize resources utilized
+ * <p>
+ * If the connection factory name was not specified, it will default to the one named "default"
+ * {@see JMSConstants.DEFAULT_CONFAC_NAME}
+ * <p>
+ * If a destination JNDI name is not specified, a service will expect to use a Queue with the same
+ * JNDI name as of the service. Additional Parameters allows one to bind to a Topic or specify
+ * many more detailed control options. See package documentation for more details
+ * <p>
+ * All Destinations / JMS Administered objects used MUST be pre-created or already available 
  */
 public class JMSListener extends AbstractTransportListener implements ManagementSupport,
-        TransportErrorSource {
+    TransportErrorSource {
 
     public static final String TRANSPORT_NAME = Constants.TRANSPORT_JMS;
 
+    /** The JMSConnectionFactoryManager which centralizes the management of defined factories */
     private JMSConnectionFactoryManager connFacManager;
     /** A Map of service name to the JMS endpoints */
     private Map<String,JMSEndpoint> serviceNameToEndpointMap = new HashMap<String,JMSEndpoint>();
-
+    /** A Map of service name to its ServiceTaskManager instances */
+    private Map<String, ServiceTaskManager> serviceNameToSTMMap =
+        new HashMap<String, ServiceTaskManager>();
     private final TransportErrorSourceSupport tess = new TransportErrorSourceSupport(this);
     
     /**
-     * This is the TransportListener initialization method invoked by Axis2
+     * TransportListener initialization
      *
-     * @param cfgCtx   the Axis configuration context
+     * @param cfgCtx the Axis configuration context
      * @param trpInDesc the TransportIn description
      */
     public void init(ConfigurationContext cfgCtx,
-                     TransportInDescription trpInDesc) throws AxisFault {
-        super.init(cfgCtx, trpInDesc);
-
-        connFacManager = new JMSConnectionFactoryManager(cfgCtx, this, workerPool);
-        // read the connection factory definitions and create them
-        connFacManager.loadConnectionFactoryDefinitions(trpInDesc);
-
-        // if no connection factories are defined, we cannot listen for any messages
-        if (connFacManager.getNames().length == 0) {
-            log.warn("No JMS connection factories are defined. Cannot listen for JMS");
-            return;
-        }
+        TransportInDescription trpInDesc) throws AxisFault {
 
+        super.init(cfgCtx, trpInDesc);
+        connFacManager = new JMSConnectionFactoryManager(trpInDesc);
         log.info("JMS Transport Receiver/Listener initialized...");
     }
 
     /**
-     * Start this JMS Listener (Transport Listener)
-     *
-     * @throws AxisFault
-     */
-    public void start() throws AxisFault {
-        connFacManager.start();
-        super.start();
-    }
-
-    /**
-     * Stop the JMS Listener, and shutdown all of the connection factories
-     */
-    public void stop() throws AxisFault {
-        super.stop();
-        connFacManager.stop();
-    }
-
-    /**
-     * Returns EPRs for the given service and IP over the JMS transport
+     * Returns EPRs for the given service over the JMS transport
      *
      * @param serviceName service name
-     * @param ip          ignored
-     * @return the EPR for the service
-     * @throws AxisFault not used
+     * @return the JMS EPRs for the service
      */
-    public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault {
+    public EndpointReference[] getEPRsForService(String serviceName) {
         //Strip out the operation name
         if (serviceName.indexOf('/') != -1) {
             serviceName = serviceName.substring(0, serviceName.indexOf('/'));
@@ -133,28 +101,29 @@
         }
         JMSEndpoint endpoint = serviceNameToEndpointMap.get(serviceName);
         if (endpoint != null) {
-            return new EndpointReference[] { new EndpointReference(endpoint.getEndpointReference()) };
+            return endpoint.getEndpointReferences();
         } else {
             return null;
         }
     }
 
     /**
-     * Prepare to listen for JMS messages on behalf of the given service
+     * Listen for JMS messages on behalf of the given service
      *
-     * @param service the service for which to listen for messages
+     * @param service the Axis service for which to listen for messages
      */
     protected void startListeningForService(AxisService service) throws AxisFault {
         JMSConnectionFactory cf = getConnectionFactory(service);
         if (cf == null) {
             throw new AxisFault("The service doesn't specify a JMS connection factory or refers " +
-            		"to an invalid factory.");
+                "to an invalid factory.");
         }
 
         JMSEndpoint endpoint = new JMSEndpoint();
         endpoint.setService(service);
-        
-        Parameter destParam = service.getParameter(JMSConstants.DEST_PARAM);
+        endpoint.setCf(cf);
+
+        Parameter destParam = service.getParameter(JMSConstants.PARAM_DESTINATION);
         if (destParam != null) {
             endpoint.setJndiDestinationName((String)destParam.getValue());
         } else {
@@ -162,10 +131,10 @@
             endpoint.setJndiDestinationName(service.getName());
         }
         
-        Parameter destTypeParam = service.getParameter(JMSConstants.DEST_PARAM_TYPE);
+        Parameter destTypeParam = service.getParameter(JMSConstants.PARAM_DEST_TYPE);
         if (destTypeParam != null) {
             String paramValue = (String) destTypeParam.getValue();
-            if(JMSConstants.DESTINATION_TYPE_QUEUE.equals(paramValue) ||
+            if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(paramValue) ||
                     JMSConstants.DESTINATION_TYPE_TOPIC.equals(paramValue) )  {
                 endpoint.setDestinationType(paramValue);
             } else {
@@ -186,15 +155,30 @@
         } else {
             endpoint.setContentTypeRuleSet(ContentTypeRuleFactory.parse(contentTypeParam));
         }
-        
-        // compute service EPR and keep for later use
-        endpoint.setEndpointReference(JMSUtils.getEPR(cf, endpoint));
+
+        endpoint.computeEPRs(); // compute service EPR and keep for later use        
         serviceNameToEndpointMap.put(service.getName(), endpoint);
         
-        log.info("Starting to listen on destination : " + endpoint.getJndiDestinationName() + " of type "
-                + endpoint.getDestinationType() + " for service " + service.getName());
-        cf.addDestination(endpoint);
-        cf.startListeningOnDestination(endpoint);
+        ServiceTaskManager stm = JMSUtils.createTaskManagerForService(cf, service, workerPool);
+        stm.setJmsMessageReceiver(new JMSMessageReceiver(this, cf, endpoint));
+        stm.start();
+        serviceNameToSTMMap.put(service.getName(), stm);
+
+        for (int i=0; i<3; i++) {
+            if (stm.getActiveTaskCount() > 0) {
+                log.info("Started to listen on destination : " + stm.getDestinationJNDIName() +
+                    " of type " + JMSUtils.getDestinationTypeAsString(stm.getDestinationType()) +
+                    " for service " + stm.getServiceName());
+                return;
+            }
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException ignore) {}
+        }
+
+        log.warn("Polling tasks on destination : " + stm.getDestinationJNDIName() +
+            " of type " + JMSUtils.getDestinationTypeAsString(stm.getDestinationType()) +
+            " for service " + stm.getServiceName() + " have not yet started after 3 seconds ..");
     }
 
     /**
@@ -204,12 +188,22 @@
      */
     protected void stopListeningForService(AxisService service) {
 
-        JMSConnectionFactory cf = getConnectionFactory(service);
-        if (cf != null) {
-            // remove from the serviceNameToEprMap
-            JMSEndpoint endpoint = serviceNameToEndpointMap.remove(service.getName());
+        ServiceTaskManager stm = serviceNameToSTMMap.get(service.getName());
+        if (stm != null) {
+            if (log.isDebugEnabled()) {
+                log.debug("Stopping listening on destination : " + stm.getDestinationJNDIName() +
+                    " for service : " + stm.getServiceName());
+            }
+
+            stm.stop();
+
+            serviceNameToSTMMap.remove(service.getName());
+            serviceNameToEndpointMap.remove(service.getName());
+            log.info("Stopped listening for JMS messages to service : " + service.getName());
 
-            cf.removeDestination(endpoint.getJndiDestinationName());
+        } else {
+            log.error("Unable to stop service : " + service.getName() +
+                " - unable to find its ServiceTaskManager");
         }
     }
     /**
@@ -220,12 +214,12 @@
      * @param service the AxisService
      * @return the JMSConnectionFactory to be used, or null if reference is invalid
      */
-    private JMSConnectionFactory getConnectionFactory(AxisService service) {
-        Parameter conFacParam = service.getParameter(JMSConstants.CONFAC_PARAM);
+    public JMSConnectionFactory getConnectionFactory(AxisService service) {
 
+        Parameter conFacParam = service.getParameter(JMSConstants.PARAM_JMS_CONFAC);
         // validate connection factory name (specified or default)
         if (conFacParam != null) {
-            return connFacManager.getJMSConnectionFactory((String)conFacParam.getValue());
+            return connFacManager.getJMSConnectionFactory((String) conFacParam.getValue());
         } else {
             return connFacManager.getJMSConnectionFactory(JMSConstants.DEFAULT_CONFAC_NAME);
         }
@@ -240,11 +234,13 @@
     public void pause() throws AxisFault {
         if (state != BaseConstants.STARTED) return;
         try {
-            connFacManager.pause();
+            for (ServiceTaskManager stm : serviceNameToSTMMap.values()) {
+                stm.pause();
+            }
             state = BaseConstants.PAUSED;
             log.info("Listener paused");
         } catch (AxisJMSException e) {
-            log.error("At least one connection factory could not be paused", e);
+            log.error("At least one service could not be paused", e);
         }
     }
 
@@ -255,11 +251,13 @@
     public void resume() throws AxisFault {
         if (state != BaseConstants.PAUSED) return;
         try {
-            connFacManager.resume();
+            for (ServiceTaskManager stm : serviceNameToSTMMap.values()) {
+                stm.resume();
+            }
             state = BaseConstants.STARTED;
             log.info("Listener resumed");
         } catch (AxisJMSException e) {
-            log.error("At least one connection factory could not be resumed", e);
+            log.error("At least one service could not be resumed", e);
         }
     }