You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commons-dev@ws.apache.org by as...@apache.org on 2008/12/08 19:15:41 UTC

svn commit: r724432 [4/4] - in /webservices/commons/trunk/modules/transport/modules: base/src/main/java/org/apache/axis2/transport/base/ jms/ jms/src/main/java/org/apache/axis2/transport/jms/ testkit/src/main/java/org/apache/axis2/transport/testkit/tes...

Added: webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java?rev=724432&view=auto
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java (added)
+++ webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java Mon Dec  8 10:15:40 2008
@@ -0,0 +1,1205 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.axis2.transport.jms;
+
+import org.apache.axis2.transport.base.BaseConstants;
+import org.apache.axis2.transport.base.threads.WorkerPool;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+import javax.naming.InitialContext;
+import javax.naming.Context;
+import javax.naming.NamingException;
+import javax.transaction.UserTransaction;
+import javax.transaction.NotSupportedException;
+import javax.transaction.SystemException;
+import java.util.*;
+
+/**
+ * Each service will have one ServiceTaskManager instance that will create, manage and also destroy
+ * idle tasks created for it, for message receipt. This will also allow individual tasks to cache
+ * the Connection, Session or Consumer as necessary, considering the transactionality required and
+ * user preference.
+ *
+ * This also acts as the ExceptionListener for all JMS connections made on behalf of the service.
+ * Since the ExceptionListener is notified by a JMS provider on a "serious" error, we simply try
+ * to re-connect. Thus a connection failure for a single task, will re-initialize the state afresh
+ * for the service, by discarding all connections. 
+ */
+public class ServiceTaskManager {
+
+    /** The logger */
+    private static final Log log = LogFactory.getLog(ServiceTaskManager.class);
+
+    /** The Task manager is stopped or has not started */
+    private static final int STATE_STOPPED = 0;
+    /** The Task manager is started and active */
+    private static final int STATE_STARTED = 1;
+    /** The Task manager is paused temporarily */
+    private static final int STATE_PAUSED = 2;
+    /** The Task manager is started, but a shutdown has been requested */
+    private static final int STATE_SHUTTING_DOWN = 3;
+    /** The Task manager has encountered an error */
+    private static final int STATE_FAILURE = 4;
+
+    /** The name of the service managed by this instance */
+    private String serviceName;
+    /** The ConnectionFactory MUST refer to an XAConnectionFactory to use JTA */
+    private String connFactoryJNDIName;
+    /** The JNDI name of the Destination Queue or Topic */
+    private String destinationJNDIName;
+    /** JNDI location for the JTA UserTransaction */
+    private String userTransactionJNDIName = "java:comp/UserTransaction";
+    /** The type of destination - P2P or PubSub (or JMS 1.1 API generic?) */
+    private int destinationType = JMSConstants.GENERIC;
+    /** An optional message selector */
+    private String messageSelector = null;
+
+    /** Should tasks run without transactions, using transacted Sessions (i.e. local), or JTA */
+    private int transactionality = BaseConstants.TRANSACTION_NONE;
+    /** Should created Sessions be transactional ? - should be false when using JTA */
+    private boolean sessionTransacted = true;
+    /** Session acknowledgement mode when transacted Sessions (i.e. local transactions) are used */
+    private int sessionAckMode = Session.AUTO_ACKNOWLEDGE;
+
+    /** Is the subscription durable ? */
+    private boolean subscriptionDurable = false;
+    /** The name of the durable subscriber for this client */
+    private String durableSubscriberName = null;
+    /** In PubSub mode, should I receive messages sent by me / my connection ? */
+    private boolean pubSubNoLocal = false;
+    /** Number of concurrent consumers - for PubSub, this should be 1 to prevent multiple receipt */
+    private int concurrentConsumers = 1;
+    /** Maximum number of consumers to create - see @concurrentConsumers */
+    private int maxConcurrentConsumers = 1;
+    /** The number of idle (i.e. message-less) attempts to be tried before suicide, to scale down */
+    private int idleTaskExecutionLimit = 10;
+    /** The maximum number of successful message receipts for a task - to limit thread life span */
+    private int maxMessagesPerTask = -1;    // default is unlimited
+    /** The default receive timeout - a negative value means wait forever, zero dont wait at all */
+    private int receiveTimeout = 1000;
+    /** JMS Resource cache level - Connection, Session, Consumer. Auto will select safe default */
+    private int cacheLevel = JMSConstants.CACHE_AUTO;
+    /** Should we cache the UserTransaction handle from JNDI - true for almost all app servers */
+    private boolean cacheUserTransaction = true;
+    /** Shared UserTransactionHandle */
+    private UserTransaction sharedUserTransaction = null;
+    /** Should this service use JMS 1.1 ? (when false, defaults to 1.0.2b) */
+    private boolean jmsSpec11 = true;
+
+    /** Initial duration to attempt re-connection to JMS provider after failure */
+    private int initialReconnectDuration = 10000;
+    /** Progression factory for geometric series that calculates re-connection times */
+    private double reconnectionProgressionFactor = 2.0; // default to [bounded] exponential
+    /** Upper limit on reconnection attempt duration */
+    private long maxReconnectDuration = 1000 * 60 * 60; // 1 hour
+
+    /** The JNDI context properties and other general properties */
+    private Hashtable<String,String> jmsProperties = new Hashtable<String, String>();
+    /** The JNDI Context acuired */
+    private Context context = null;
+    /** The ConnectionFactory to be used */
+    private ConnectionFactory conFactory = null;
+    /** The JMS Destination */
+    private Destination destination = null;
+
+    /** The list of active tasks thats managed by this instance */
+    private final List<MessageListenerTask> pollingTasks =
+        Collections.synchronizedList(new ArrayList<MessageListenerTask>());
+    /** The per-service JMS message receiver to be invoked after receipt of messages */
+    private JMSMessageReceiver jmsMessageReceiver = null;
+
+    /** State of this Task Manager */
+    private volatile int serviceTaskManagerState = STATE_STOPPED;
+    /** Number of invoker tasks active */
+    private volatile int activeTaskCount = 0;
+    /** The shared thread pool from the Listener */
+    private WorkerPool workerPool = null;
+
+    /** The JMS Connection shared between multiple polling tasks - when enabled (reccomended) */
+    private Connection sharedConnection = null;
+
+    /**
+     * Start or re-start the Task Manager by shutting down any existing worker tasks and
+     * re-creating them. However, if this is STM is PAUSED, a start request is ignored.
+     * This applies for any connection failures during paused state as well, which then will
+     * not try to auto recover
+     */
+    public synchronized void start() {
+
+        if (serviceTaskManagerState == STATE_PAUSED) {
+            log.info("Attempt to re-start paused TaskManager is ignored. Please use resume instead");
+            return;
+        }
+
+        // if any tasks are running, stop whats running now
+        if (!pollingTasks.isEmpty()) {
+            stop();
+        }
+
+        if (cacheLevel == JMSConstants.CACHE_AUTO) {
+			cacheLevel = 
+                transactionality == BaseConstants.TRANSACTION_NONE ?
+                    JMSConstants.CACHE_CONSUMER : JMSConstants.CACHE_NONE;
+        }
+        switch (cacheLevel) {
+            case JMSConstants.CACHE_NONE:
+                log.debug("No JMS resources will be cached/shared between poller " +
+                    "worker tasks of service : " + serviceName);
+                break;
+            case JMSConstants.CACHE_CONNECTION:
+                log.debug("Only the JMS Connection will be cached and shared between *all* " +
+                    "poller task invocations");
+                break;
+            case JMSConstants.CACHE_SESSION:
+                log.debug("The JMS Connection and Session will be cached and shared between " +
+                    "successive poller task invocations");
+                break;
+            case JMSConstants.CACHE_CONSUMER:
+                log.debug("The JMS Connection, Session and MessageConsumer will be cached and " +
+                    "shared between successive poller task invocations");
+                break;
+            default : {
+                handleException("Invalid cache level : " + cacheLevel +
+                    " for service : " + serviceName);
+            }
+        }
+
+        for (int i=0; i<concurrentConsumers; i++) {
+            workerPool.execute(new MessageListenerTask());
+        }
+
+        serviceTaskManagerState = STATE_STARTED;
+        log.info("Task manager for service : " + serviceName + " [re-]initialized");
+    }
+
+    /**
+     * Shutdown the tasks and release any shared resources
+     */
+    public synchronized void stop() {
+
+        if (log.isDebugEnabled()) {
+            log.debug("Stopping ServiceTaskManager for service : " + serviceName);
+        }
+
+        if (serviceTaskManagerState != STATE_FAILURE) {
+            serviceTaskManagerState = STATE_SHUTTING_DOWN;
+        }
+
+        synchronized(pollingTasks) {
+            for (MessageListenerTask lstTask : pollingTasks) {
+                lstTask.requestShutdown();
+            }
+        }
+
+        // try to wait a bit for task shutdown
+        for (int i=0; i<5; i++) {
+            if (activeTaskCount == 0) {
+                break;
+            }
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException ignore) {}
+        }
+
+        if (sharedConnection != null) {
+            try {
+                sharedConnection.stop();
+            } catch (JMSException e) {
+                logError("Error stopping shared Connection", e);
+            } finally {
+                sharedConnection = null;
+            }
+        }
+
+        if (activeTaskCount > 0) {
+            log.warn("Unable to shutdown all polling tasks of service : " + serviceName);
+        }
+
+        if (serviceTaskManagerState != STATE_FAILURE) {
+            serviceTaskManagerState = STATE_STOPPED;
+        }
+        log.info("Task manager for service : " + serviceName + " shutdown");
+    }
+
+    /**
+     * Temporarily suspend receipt and processing of messages. Accomplished by stopping the
+     * connection / or connections used by the poller tasks
+     */
+    public synchronized void pause() {
+        for (MessageListenerTask lstTask : pollingTasks) {
+            lstTask.pause();
+        }
+        if (sharedConnection != null) {
+            try {
+                sharedConnection.stop();
+            } catch (JMSException e) {
+                logError("Error pausing shared Connection", e);
+            }
+        }
+    }
+
+    /**
+     * Resume receipt and processing of messages of paused tasks
+     */
+    public synchronized void resume() {
+        for (MessageListenerTask lstTask : pollingTasks) {
+            lstTask.resume();
+        }
+        if (sharedConnection != null) {
+            try {
+                sharedConnection.start();
+            } catch (JMSException e) {
+                logError("Error resuming shared Connection", e);
+            }
+        }
+    }
+
+    /**
+     * Start a new MessageListenerTask if we are still active, the threshold is not reached, and w
+     * e do not have any idle tasks - i.e. scale up listening
+     */
+    private void scheduleNewTaskIfAppropriate() {
+        if (serviceTaskManagerState == STATE_STARTED &&
+            pollingTasks.size() < getMaxConcurrentConsumers() && getIdleTaskCount() == 0) {
+            workerPool.execute(new MessageListenerTask());
+        }
+    }
+
+    /**
+     * Get the number of MessageListenerTasks that are currently idle
+     * @return idle task count
+     */
+    private int getIdleTaskCount() {
+        int count = 0;
+        for (MessageListenerTask lstTask : pollingTasks) {
+            if (lstTask.isTaskIdle()) {
+                count++;
+            }
+        }
+        return count;
+    }
+
+    /**
+     * Get the number of MessageListenerTasks that are currently connected to the JMS provider
+     * @return connected task count
+     */
+    private int getConnectedTaskCount() {
+        int count = 0;
+        for (MessageListenerTask lstTask : pollingTasks) {
+            if (lstTask.isConnected()) {
+                count++;
+            }
+        }
+        return count;
+    }
+
+    /**
+     * The actual threads/tasks that perform message polling
+     */
+    private class MessageListenerTask implements Runnable, ExceptionListener {
+
+        /** The Connection used by the polling task */
+        private Connection connection = null;
+        /** The Sesson used by the polling task */
+        private Session session = null;
+        /** The MessageConsumer used by the polling task */
+        private MessageConsumer consumer = null;
+        /** State of the worker polling task */
+        private volatile int workerState = STATE_STOPPED;
+        /** The number of idle (i.e. without fetching a message) polls for this task */
+        private int idleExecutionCount = 0;
+        /** Is this task idle right now? */
+        private volatile boolean idle = false;
+        /** Is this task connected to the JMS provider successfully? */
+        private boolean connected = false;
+
+        /** As soon as we create a new polling task, add it to the STM for control later */
+        MessageListenerTask() {
+            synchronized(pollingTasks) {
+                pollingTasks.add(this);
+            }
+        }
+
+        /**
+         * Pause this polling worker task
+         */
+        public void pause() {
+            if (isActive()) {
+                if (connection != null && cacheLevel < JMSConstants.CACHE_CONNECTION) {
+                    try {
+                        connection.stop();
+                    } catch (JMSException e) {
+                        log.warn("Error pausing Message Listener task for service : " + serviceName);
+                    }
+                }
+                workerState = STATE_PAUSED;
+            }
+        }
+
+        /**
+         * Resume this polling task
+         */
+        public void resume() {
+            if (connection != null && cacheLevel < JMSConstants.CACHE_CONNECTION) {
+                try {
+                    connection.start();
+                } catch (JMSException e) {
+                    log.warn("Error resuming Message Listener task for service : " + serviceName);
+                }
+            }
+            workerState = STATE_STARTED;
+        }
+
+        /**
+         * Execute the polling worker task
+         */
+        public void run() {
+            workerState = STATE_STARTED;
+            activeTaskCount++;
+            int messageCount = 0;
+
+            if (log.isDebugEnabled()) {
+                log.debug("New poll task starting : thread id = " + Thread.currentThread().getId());
+            }
+
+            try {
+                while (isActive() &&
+                    (getMaxMessagesPerTask() < 0 || messageCount < getMaxMessagesPerTask()) &&
+                    (getConcurrentConsumers() == 1 || idleExecutionCount < getIdleTaskExecutionLimit())) {
+
+                    UserTransaction ut = null;
+                    try {
+                        if (transactionality == BaseConstants.TRANSACTION_JTA) {
+                            ut = getUserTransaction();
+                            ut.begin();
+                        }
+                    } catch (NotSupportedException e) {
+                        handleException("Listener Task is already associated with a transaction", e);
+                    } catch (SystemException e) {
+                        handleException("Error starting a JTA transaction", e);
+                    }
+
+                    // Get a message by polling, or receive null
+                    Message message = receiveMessage();
+
+                    if (log.isTraceEnabled()) {
+                        if (message != null) {
+                            try {
+                                log.trace("<<<<<<< READ message with Message ID : " +
+                                    message.getJMSMessageID() + " from : " + destination +
+                                    " by Thread ID : " + Thread.currentThread().getId());
+                            } catch (JMSException ignore) {}
+                        } else {
+                            log.trace("No message received by Thread ID : " +
+                                Thread.currentThread().getId() + " for destination : " + destination);
+                        }
+                    }
+
+                    if (message != null) {
+                        idle = false;
+                        idleExecutionCount = 0;
+                        messageCount++;
+                        // I will be busy now while processing this message, so start another if needed
+                        scheduleNewTaskIfAppropriate();
+                        handleMessage(message, ut);
+
+                    } else {
+                        idle = true;
+                        idleExecutionCount++;
+                    }
+                }
+
+            } finally {
+                workerState = STATE_STOPPED;
+                activeTaskCount--;
+                synchronized(pollingTasks) {
+                    pollingTasks.remove(this);
+                }
+            }
+
+            if (log.isTraceEnabled()) {
+                log.trace("Listener task with Thread ID : " + Thread.currentThread().getId() +
+                    " is stopping after processing : " + messageCount + " messages :: " +
+                    " isActive : " + isActive() + " maxMessagesPerTask : " +
+                    getMaxMessagesPerTask() + " concurrentConsumers : " + getConcurrentConsumers() +
+                    " idleExecutionCount : " + idleExecutionCount + " idleTaskExecutionLimit : " + 
+                    getIdleTaskExecutionLimit());
+            } else if (log.isDebugEnabled()) {
+                log.debug("Listener task with Thread ID : " + Thread.currentThread().getId() +
+                    " is stopping after processing : " + messageCount + " messages");
+            }
+
+            closeConsumer(true);
+            closeSession(true);
+            closeConnection();
+
+            // My time is up, so if I am going away, create another
+            scheduleNewTaskIfAppropriate();
+        }
+
+        /**
+         * Poll for and return a message if available
+         *
+         * @return a message read, or null
+         */
+        private Message receiveMessage() {
+
+            // get a new connection, session and consumer to prevent a conflict.
+            // If idle, it means we can re-use what we already have 
+            if (consumer == null) {
+                connection = getConnection();
+                session = getSession();
+                consumer = getMessageConsumer();
+                if (log.isDebugEnabled()) {
+                    log.debug("Preparing a Connection, Session and Consumer to read messages");
+                }
+            }
+
+            if (log.isDebugEnabled()) {
+                log.debug("Waiting for a message for service : " + serviceName + " - duration : "
+                    + (getReceiveTimeout() < 0 ? "unlimited" : (getReceiveTimeout() + "ms")));
+            }
+
+            try {
+                if (getReceiveTimeout() < 0) {
+                    return consumer.receive();
+                } else {
+                    return consumer.receive(getReceiveTimeout());
+                }
+            } catch (IllegalStateException ignore) {
+                // probably the consumer (shared) was closed.. which is still ok.. as we didn't read
+            } catch (JMSException e) {
+                logError("Error receiving message for service : " + serviceName, e);
+            }
+            return null;
+        }
+
+        /**
+         * Invoke ultimate message handler/listener and ack message and/or
+         * commit/rollback transactions
+         * @param message the JMS message received
+         * @param ut the UserTransaction used to receive this message, or null
+         */
+        private void handleMessage(Message message, UserTransaction ut) {
+
+            String messageId = null;
+            try {
+                messageId = message.getJMSMessageID();
+            } catch (JMSException ignore) {}
+
+            boolean commitOrAck = true;
+            try {
+                commitOrAck = jmsMessageReceiver.onMessage(message, ut);
+
+            } finally {
+
+                // if client acknowledgement is selected, and processing requested ACK
+                if (commitOrAck && getSessionAckMode() == Session.CLIENT_ACKNOWLEDGE) {
+                    try {
+                        message.acknowledge();
+                        if (log.isDebugEnabled()) {
+                            log.debug("Message : " + messageId + " acknowledged");
+                        }
+                    } catch (JMSException e) {
+                        logError("Error acknowledging message : " + messageId, e);
+                    }
+                }
+
+                // close the consumer
+                closeConsumer(false);
+
+                // if session was transacted, commit it or rollback
+                try {
+                    if (session.getTransacted()) {
+                        if (commitOrAck) {
+                            session.commit();
+                            if (log.isDebugEnabled()) {
+                                log.debug("Session for message : " + messageId + " committed");
+                            }
+                        } else {
+                            session.rollback();
+                            if (log.isDebugEnabled()) {
+                                log.debug("Session for message : " + messageId + " rolled back");
+                            }
+                        }
+                    }
+                } catch (JMSException e) {
+                    logError("Error " + (commitOrAck ? "committing" : "rolling back") +
+                        " local session txn for message : " + messageId, e);
+                }
+
+                // if a JTA transaction was being used, commit it or rollback
+                try {
+                    if (ut != null) {
+                        if (commitOrAck) {
+                            ut.commit();
+                            if (log.isDebugEnabled()) {
+                                log.debug("JTA txn for message : " + messageId + " committed");
+                            }
+                        } else {
+                            ut.rollback();
+                            if (log.isDebugEnabled()) {
+                                log.debug("JTA txn for message : " + messageId + " rolled back");
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    logError("Error " + (commitOrAck ? "committing" : "rolling back") +
+                        " JTA txn for message : " + messageId + " from the session", e);
+                }
+
+                closeSession(false);
+                closeConnection();
+            }
+        }
+
+        /** Handle JMS Connection exceptions by re-initializing. A single connection failure could
+         * cause re-initialization of multiple MessageListenerTasks / Connections
+         */
+        public void onException(JMSException j) {
+
+            if (!isSTMActive()) {
+                requestShutdown();
+                return;
+            }
+
+            log.warn("JMS Connection failure : " + j.getMessage());
+            setConnected(false);
+
+            if (cacheLevel < JMSConstants.CACHE_CONNECTION) {
+                // failed Connection was not shared, thus no need to restart the whole STM
+                requestShutdown();
+                return;
+            }
+
+            // if we failed while active, update state to show failure
+            setServiceTaskManagerState(STATE_FAILURE);
+            log.error("JMS Connection failed : " + j.getMessage() + " - shutting down worker tasks");
+
+            int r = 1;
+            long retryDuration = initialReconnectDuration;
+
+            do {
+                try {
+                    log.info("Reconnection attempt : " + r + " for service : " + serviceName);
+                    start();
+                } catch (Exception ignore) {}
+
+                boolean connected = false;
+                for (int i=0; i<5; i++) {
+                    if (getConnectedTaskCount() == concurrentConsumers) {
+                        connected = true;
+                        break;
+                    }
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException ignore) {}
+                }
+
+                if (!connected) {
+                    log.error("Reconnection attempt : " + (r++) + " for service : " + serviceName +
+                        " failed. Next retry in " + (retryDuration/1000) + "seconds");
+                    retryDuration = (long) (retryDuration * reconnectionProgressionFactor);
+                    if (retryDuration > maxReconnectDuration) {
+                        retryDuration = maxReconnectDuration;
+                    }
+
+                    try {
+                        Thread.sleep(retryDuration);
+                    } catch (InterruptedException ignore) {}
+                }
+
+            } while (!isSTMActive() || getConnectedTaskCount() < concurrentConsumers);
+        }
+
+        protected void requestShutdown() {
+            workerState = STATE_SHUTTING_DOWN;
+        }
+
+        private boolean isActive() {
+            return workerState == STATE_STARTED;
+        }
+
+        protected boolean isTaskIdle() {
+            return idle;
+        }
+
+        public boolean isConnected() {
+            return connected;
+        }
+
+        public void setConnected(boolean connected) {
+            this.connected = connected;
+        }
+
+        /**
+         * Get a Connection that could/should be used by this task - depends on the cache level to reuse
+         * @return the shared Connection if cache level is higher than CACHE_NONE, or a new Connection
+         */
+        private Connection getConnection() {
+            if (cacheLevel < JMSConstants.CACHE_CONNECTION) {
+                // Connection is not shared
+                if (connection == null) {
+                    connection = createConnection();
+                }
+            } else {
+                if (sharedConnection != null) {
+                    connection = sharedConnection;
+                } else {
+                    synchronized(this) {
+                        if (sharedConnection == null) {
+                            sharedConnection = createConnection();
+                        }
+                        connection = sharedConnection;
+                    }
+                }
+            }
+            setConnected(true);
+            return connection;
+        }
+
+        /**
+         * Get a Session that could/should be used by this task - depends on the cache level to reuse
+         * @param connection the connection (could be the shared connection) to use to create a Session
+         * @return the shared Session if cache level is higher than CACHE_CONNECTION, or a new Session
+         * created using the Connection passed, or a new/shared connection
+         */
+        private Session getSession() {
+            if (session == null || cacheLevel < JMSConstants.CACHE_SESSION) {
+                session = createSession();
+            }
+            return session;
+        }
+
+        /**
+         * Get a MessageConsumer that chould/should be used by this task - depends on the cache
+         * level to reuse
+         * @param connection option Connection to be used
+         * @param session optional Session to be used
+         * @return the shared MessageConsumer if cache level is higher than CACHE_SESSION, or a new
+         * MessageConsumer possibly using the Connection and Session passed in
+         */
+        private MessageConsumer getMessageConsumer() {
+            if (consumer == null || cacheLevel < JMSConstants.CACHE_CONSUMER) {
+                consumer = createConsumer();
+            }
+            return consumer;
+        }
+
+        /**
+         * Close the given Connection, hiding exceptions if any which are logged
+         * @param connection the Connection to be closed
+         */
+        private void closeConnection() {
+            if (connection != null &&
+                cacheLevel < JMSConstants.CACHE_CONNECTION) {
+                try {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Closing non-shared JMS connection for service : " + serviceName);
+                    }
+                    connection.close();
+                } catch (JMSException e) {
+                    logError("Error closing JMS connection", e);
+                } finally {
+                    connection = null;
+                }
+            }
+        }
+
+        /**
+         * Close the given Session, hiding exceptions if any which are logged
+         * @param session the Session to be closed
+         */
+        private void closeSession(boolean forced) {
+            if (session != null &&
+                (cacheLevel < JMSConstants.CACHE_SESSION || forced)) {
+                try {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Closing non-shared JMS session for service : " + serviceName);
+                    }
+                    session.close();
+                } catch (JMSException e) {
+                    logError("Error closing JMS session", e);
+                } finally {
+                    session = null;
+                }
+            }
+        }
+
+        /**
+         * Close the given Consumer, hiding exceptions if any which are logged
+         * @param consumer the Consumer to be closed
+         */
+        private void closeConsumer(boolean forced) {
+            if (consumer != null &&
+                (cacheLevel < JMSConstants.CACHE_CONSUMER || forced)) {
+                try {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Closing non-shared JMS consumer for service : " + serviceName);
+                    }
+                    consumer.close();
+                } catch (JMSException e) {
+                    logError("Error closing JMS consumer", e);
+                } finally {
+                    consumer = null;
+                }
+            }
+        }
+
+        /**
+         * Create a new Connection for this STM, using JNDI properties and credentials provided
+         * @return a new Connection for this STM, using JNDI properties and credentials provided
+         */
+        private Connection createConnection() {
+
+            try {
+                conFactory = JMSUtils.lookup(
+                    getInitialContext(), ConnectionFactory.class, getConnFactoryJNDIName());
+                log.info("Connected to the JMS connection factory : " + getConnFactoryJNDIName());
+            } catch (NamingException e) {
+                handleException("Error looking up connection factory : " + getConnFactoryJNDIName() +
+                    " using JNDI properties : " + jmsProperties, e);
+            }
+
+            Connection connection = null;
+            try {
+                connection = JMSUtils.createConnection(
+                    conFactory,
+                    jmsProperties.get(JMSConstants.PARAM_JMS_USERNAME),
+                    jmsProperties.get(JMSConstants.PARAM_JMS_PASSWORD),
+                    isJmsSpec11(), isQueue());
+
+                connection.setExceptionListener(this);
+                connection.start();
+                log.info("JMS Connection for service : " + serviceName + " created and started");
+
+            } catch (JMSException e) {
+                handleException("Error acquiring a JMS connection to : " + getConnFactoryJNDIName() +
+                    " using JNDI properties : " + jmsProperties, e);
+            }
+            return connection;
+        }
+
+        /**
+         * Create a new Session for this STM
+         * @param connection the Connection to be used
+         * @return a new Session created using the Connection passed in
+         */
+        private Session createSession() {
+            try {
+                if (log.isDebugEnabled()) {
+                    log.debug("Creating a new JMS Session for service : " + serviceName);
+                }
+                return JMSUtils.createSession(
+                    connection, isSessionTransacted(), getSessionAckMode(), isJmsSpec11(), isQueue());
+
+            } catch (JMSException e) {
+                handleException("Error creating JMS session for service : " + serviceName, e);
+            }
+            return null;
+        }
+
+        /**
+         * Create a new MessageConsumer for this STM
+         * @param session the Session to be used
+         * @return a new MessageConsumer created using the Session passed in
+         */
+        private MessageConsumer createConsumer() {
+            try {
+                if (log.isDebugEnabled()) {
+                    log.debug("Creating a new JMS MessageConsumer for service : " + serviceName);
+                }
+
+                return JMSUtils.createConsumer(
+                    session, getDestination(session), isQueue(),
+                    (isSubscriptionDurable() && getDurableSubscriberName() == null ?
+                        getDurableSubscriberName() : serviceName),
+                    getMessageSelector(), isPubSubNoLocal(), isSubscriptionDurable(), isJmsSpec11());
+
+            } catch (JMSException e) {
+                handleException("Error creating JMS consumer for service : " + serviceName,e);
+            }
+            return null;
+        }
+    }
+
+    // -------------- mundane private methods ----------------
+    /**
+     * Get the InitialContext for lookup using the JNDI parameters applicable to the service
+     * @return the InitialContext to be used
+     * @throws NamingException
+     */
+    private Context getInitialContext() throws NamingException {
+        if (context == null) {
+            context = new InitialContext(jmsProperties);
+        }
+        return context;
+    }
+
+    /**
+     * Return the JMS Destination for the JNDI name of the Destination from the InitialContext
+     * @return the JMS Destination to which this STM listens for messages
+     */
+    private Destination getDestination(Session session) {
+        if (destination == null) {
+            try {
+                context = getInitialContext();
+                destination = JMSUtils.lookup(context, Destination.class, getDestinationJNDIName());
+                if (log.isDebugEnabled()) {
+                    log.debug("JMS Destination with JNDI name : " + getDestinationJNDIName() +
+                        " found for service " + serviceName);
+                }
+            } catch (NamingException e) {
+                try {
+                    switch (destinationType) {
+                        case JMSConstants.QUEUE: {
+                            destination = session.createQueue(getDestinationJNDIName());
+                            break;
+                        }
+                        case JMSConstants.TOPIC: {
+                            destination = session.createTopic(getDestinationJNDIName());
+                            break;
+                        }
+                        default: {
+                            handleException("Error looking up JMS destination : " +
+                                getDestinationJNDIName() + " using JNDI properties : " +
+                                jmsProperties, e);
+                        }
+                    }
+                } catch (JMSException j) {
+                    handleException("Error looking up and creating JMS destination : " +
+                        getDestinationJNDIName() + " using JNDI properties : " + jmsProperties, e);
+                }
+            }
+        }
+        return destination;
+    }
+
+    /**
+     * The UserTransaction to be used, looked up from the JNDI
+     * @return The UserTransaction to be used, looked up from the JNDI
+     */
+    private UserTransaction getUserTransaction() {
+        if (!cacheUserTransaction) {
+            if (log.isDebugEnabled()) {
+                log.debug("Acquiring a new UserTransaction for service : " + serviceName);
+            }
+
+            try {
+                context = getInitialContext();
+                return
+                    JMSUtils.lookup(context, UserTransaction.class, getUserTransactionJNDIName());
+            } catch (NamingException e) {
+                handleException("Error looking up UserTransaction : " + getDestinationJNDIName() +
+                    " using JNDI properties : " + jmsProperties, e);
+            }
+        }
+        
+        if (sharedUserTransaction == null) {
+            try {
+                context = getInitialContext();
+                sharedUserTransaction =
+                    JMSUtils.lookup(context, UserTransaction.class, getUserTransactionJNDIName());
+                if (log.isDebugEnabled()) {
+                    log.debug("Acquired shared UserTransaction for service : " + serviceName);
+                }
+            } catch (NamingException e) {
+                handleException("Error looking up UserTransaction : " + getDestinationJNDIName() +
+                    " using JNDI properties : " + jmsProperties, e);
+            }
+        }
+        return sharedUserTransaction;
+    }
+
+    // -------------------- trivial methods ---------------------
+    private boolean isSTMActive() {
+        return serviceTaskManagerState == STATE_STARTED;
+    }
+
+    /**
+     * Is this STM bound to a Queue, Topic or a JMS 1.1 Generic Destination?
+     * @return TRUE for a Queue, FALSE for a Topic and NULL for a Generic Destination
+     */
+    private Boolean isQueue() {
+        if (destinationType == JMSConstants.GENERIC) {
+            return null;
+        } else {
+            return destinationType == JMSConstants.QUEUE;   
+        }
+    }
+
+    private void logError(String msg, Exception e) {
+        log.error(msg, e);
+    }
+
+    private void handleException(String msg, Exception e) {
+        log.error(msg, e);
+        throw new AxisJMSException(msg, e);
+    }
+
+    private void handleException(String msg) {
+        log.error(msg);
+        throw new AxisJMSException(msg);
+    }
+
+    // -------------- getters and setters ------------------
+    public String getServiceName() {
+        return serviceName;
+    }
+
+    public void setServiceName(String serviceName) {
+        this.serviceName = serviceName;
+    }
+
+    public String getConnFactoryJNDIName() {
+        return connFactoryJNDIName;
+    }
+
+    public void setConnFactoryJNDIName(String connFactoryJNDIName) {
+        this.connFactoryJNDIName = connFactoryJNDIName;
+    }
+
+    public String getDestinationJNDIName() {
+        return destinationJNDIName;
+    }
+
+    public void setDestinationJNDIName(String destinationJNDIName) {
+        this.destinationJNDIName = destinationJNDIName;
+    }
+
+    public int getDestinationType() {
+        return destinationType;
+    }
+
+    public void setDestinationType(int destinationType) {
+        this.destinationType = destinationType;
+    }
+
+    public String getMessageSelector() {
+        return messageSelector;
+    }
+
+    public void setMessageSelector(String messageSelector) {
+        this.messageSelector = messageSelector;
+    }
+
+    public int getTransactionality() {
+        return transactionality;
+    }
+
+    public void setTransactionality(int transactionality) {
+        this.transactionality = transactionality;
+        sessionTransacted = (transactionality == BaseConstants.TRANSACTION_LOCAL);
+    }
+
+    public boolean isSessionTransacted() {
+        return sessionTransacted;
+    }
+
+    public void setSessionTransacted(Boolean sessionTransacted) {
+        if (sessionTransacted != null) {
+            this.sessionTransacted = sessionTransacted;
+            // sesstionTransacted means local transactions are used, however !sessionTransacted does
+            // not mean that JTA is used
+            if (sessionTransacted) {
+                transactionality = BaseConstants.TRANSACTION_LOCAL;
+            }
+        }
+    }
+
+    public int getSessionAckMode() {
+        return sessionAckMode;
+    }
+
+    public void setSessionAckMode(int sessionAckMode) {
+        this.sessionAckMode = sessionAckMode;
+    }
+
+    public boolean isSubscriptionDurable() {
+        return subscriptionDurable;
+    }
+
+    public void setSubscriptionDurable(Boolean subscriptionDurable) {
+        if (subscriptionDurable != null) {
+            this.subscriptionDurable = subscriptionDurable;
+        }
+    }
+
+    public String getDurableSubscriberName() {
+        return durableSubscriberName;
+    }
+
+    public void setDurableSubscriberName(String durableSubscriberName) {
+        this.durableSubscriberName = durableSubscriberName;
+    }
+
+    public boolean isPubSubNoLocal() {
+        return pubSubNoLocal;
+    }
+
+    public void setPubSubNoLocal(Boolean pubSubNoLocal) {
+        if (pubSubNoLocal != null) {
+            this.pubSubNoLocal = pubSubNoLocal;
+        }
+    }
+
+    public int getConcurrentConsumers() {
+        return concurrentConsumers;
+    }
+
+    public void setConcurrentConsumers(int concurrentConsumers) {
+        this.concurrentConsumers = concurrentConsumers;
+    }
+
+    public int getMaxConcurrentConsumers() {
+        return maxConcurrentConsumers;
+    }
+
+    public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
+        this.maxConcurrentConsumers = maxConcurrentConsumers;
+    }
+
+    public int getIdleTaskExecutionLimit() {
+        return idleTaskExecutionLimit;
+    }
+
+    public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) {
+        this.idleTaskExecutionLimit = idleTaskExecutionLimit;
+    }
+
+    public int getReceiveTimeout() {
+        return receiveTimeout;
+    }
+
+    public void setReceiveTimeout(int receiveTimeout) {
+        this.receiveTimeout = receiveTimeout;
+    }
+
+    public int getCacheLevel() {
+        return cacheLevel;
+    }
+
+    public void setCacheLevel(int cacheLevel) {
+        this.cacheLevel = cacheLevel;
+    }
+
+    public int getInitialReconnectDuration() {
+        return initialReconnectDuration;
+    }
+
+    public void setInitialReconnectDuration(int initialReconnectDuration) {
+        this.initialReconnectDuration = initialReconnectDuration;
+    }
+
+    public double getReconnectionProgressionFactor() {
+        return reconnectionProgressionFactor;
+    }
+
+    public void setReconnectionProgressionFactor(double reconnectionProgressionFactor) {
+        this.reconnectionProgressionFactor = reconnectionProgressionFactor;
+    }
+
+    public long getMaxReconnectDuration() {
+        return maxReconnectDuration;
+    }
+
+    public void setMaxReconnectDuration(long maxReconnectDuration) {
+        this.maxReconnectDuration = maxReconnectDuration;
+    }
+
+    public int getMaxMessagesPerTask() {
+        return maxMessagesPerTask;
+    }
+
+    public void setMaxMessagesPerTask(int maxMessagesPerTask) {
+        this.maxMessagesPerTask = maxMessagesPerTask;
+    }
+
+    public String getUserTransactionJNDIName() {
+        return userTransactionJNDIName;
+    }
+
+    public void setUserTransactionJNDIName(String userTransactionJNDIName) {
+        if (userTransactionJNDIName != null) {
+            this.userTransactionJNDIName = userTransactionJNDIName;
+        }
+    }
+
+    public boolean isCacheUserTransaction() {
+        return cacheUserTransaction;
+    }
+
+    public void setCacheUserTransaction(Boolean cacheUserTransaction) {
+        if (cacheUserTransaction != null) {
+            this.cacheUserTransaction = cacheUserTransaction;
+        }
+    }
+
+    public boolean isJmsSpec11() {
+        return jmsSpec11;
+    }
+
+    public void setJmsSpec11(boolean jmsSpec11) {
+        this.jmsSpec11 = jmsSpec11;
+    }
+
+    public Hashtable<String, String> getJmsProperties() {
+        return jmsProperties;
+    }
+
+    public void addJmsProperties(Map<String, String> jmsProperties) {
+        this.jmsProperties.putAll(jmsProperties);
+    }
+
+    public void removeJmsProperties(String key) {
+        this.jmsProperties.remove(key);
+    }
+
+    public Context getContext() {
+        return context;
+    }
+
+    public ConnectionFactory getConnectionFactory() {
+        return conFactory;
+    }
+
+    public List<MessageListenerTask> getPollingTasks() {
+        return pollingTasks;
+    }
+
+    public void setJmsMessageReceiver(JMSMessageReceiver jmsMessageReceiver) {
+        this.jmsMessageReceiver = jmsMessageReceiver;
+    }
+
+    public void setWorkerPool(WorkerPool workerPool) {
+        this.workerPool = workerPool;
+    }
+
+    public int getActiveTaskCount() {
+        return activeTaskCount;
+    }
+
+    public void setServiceTaskManagerState(int serviceTaskManagerState) {
+        this.serviceTaskManagerState = serviceTaskManagerState;
+    }
+}

Added: webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/package.html
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/package.html?rev=724432&view=auto
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/package.html (added)
+++ webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/package.html Mon Dec  8 10:15:40 2008
@@ -0,0 +1,356 @@
+<html>
+<title>JMS Transport Configuration</title>
+<body>
+
+<h2>JMS Listener Configuration (axis2.xml)</h2>
+
+e.g:
+
+<pre>
+    &lt;transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener"&gt;
+        &lt;parameter name="myTopicConnectionFactory"&gt;
+            &lt;parameter name="java.naming.factory.initial"&gt;org.apache.activemq.jndi.ActiveMQInitialContextFactory&lt;/parameter&gt;
+            &lt;parameter name="java.naming.provider.url"&gt;tcp://localhost:61616&lt;/parameter&gt;
+            &lt;parameter name="transport.jms.ConnectionFactoryJNDIName"&gt;TopicConnectionFactory&lt;/parameter&gt;
+            &lt;parameter name="transport.jms.ConnectionFactoryType"&gt;topic&lt;/parameter&gt;
+            &lt;parameter name="transport.jms.JMSSpecVersion"&gt;1.0.2b&lt;/parameter&gt;
+        &lt;/parameter&gt;
+
+        &lt;parameter name="myQueueConnectionFactory"&gt;
+            &lt;parameter name="java.naming.factory.initial"&gt;org.apache.activemq.jndi.ActiveMQInitialContextFactory&lt;/parameter&gt;
+            &lt;parameter name="java.naming.provider.url"&gt;tcp://localhost:61616&lt;/parameter&gt;
+            &lt;parameter name="transport.jms.ConnectionFactoryJNDIName"&gt;QueueConnectionFactory&lt;/parameter&gt;
+            &lt;parameter name="transport.jms.ConnectionFactoryType"&gt;queue&lt;/parameter&gt;
+            &lt;parameter name="transport.jms.JMSSpecVersion"&gt;1.1&lt;/parameter&gt;
+        &lt;/parameter&gt;
+
+        &lt;parameter name="default"&gt;
+            &lt;parameter name="java.naming.factory.initial"&gt;org.apache.activemq.jndi.ActiveMQInitialContextFactory&lt;/parameter&gt;
+            &lt;parameter name="java.naming.provider.url"&gt;tcp://localhost:61616&lt;/parameter&gt;
+            &lt;parameter name="transport.jms.ConnectionFactoryJNDIName"&gt;ConnectionFactory&lt;/parameter&gt;
+        &lt;/parameter&gt;
+    &lt;/transportReceiver&gt;
+
+    &lt;transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender"&gt;
+        &lt;parameter name="myTopicConnectionFactory"&gt;
+            &lt;parameter name="java.naming.factory.initial"&gt;org.apache.activemq.jndi.ActiveMQInitialContextFactory&lt;/parameter&gt;
+            &lt;parameter name="java.naming.provider.url"&gt;tcp://localhost:61616&lt;/parameter&gt;
+            &lt;parameter name="transport.jms.ConnectionFactoryJNDIName"&gt;TopicConnectionFactory&lt;/parameter&gt;
+            &lt;parameter name="transport.jms.JMSSpecVersion"&gt;1.0.2b&lt;/parameter&gt;
+            &lt;parameter name="transport.jms.CacheLevel"&gt;producer&lt;/parameter&gt;
+        &lt;/parameter&gt;
+
+        &lt;parameter name="myQueueConnectionFactory"&gt;
+            &lt;parameter name="java.naming.factory.initial"&gt;org.apache.activemq.jndi.ActiveMQInitialContextFactory&lt;/parameter&gt;
+            &lt;parameter name="java.naming.provider.url"&gt;tcp://localhost:61616&lt;/parameter&gt;
+            &lt;parameter name="transport.jms.ConnectionFactoryJNDIName"&gt;QueueConnectionFactory&lt;/parameter&gt;
+            &lt;parameter name="transport.jms.JMSSpecVersion"&gt;1.0.2b&lt;/parameter&gt;
+            &lt;parameter name="transport.jms.CacheLevel"&gt;producer&lt;/parameter&gt;
+        &lt;/parameter&gt;
+
+        &lt;parameter name="default"&gt;
+            &lt;parameter name="java.naming.factory.initial"&gt;org.apache.activemq.jndi.ActiveMQInitialContextFactory&lt;/parameter&gt;
+            &lt;parameter name="java.naming.provider.url"&gt;tcp://localhost:61616&lt;/parameter&gt;
+            &lt;parameter name="transport.jms.ConnectionFactoryJNDIName"&gt;ConnectionFactory&lt;/parameter&gt;
+            &lt;parameter name="transport.jms.CacheLevel"&gt;connection&lt;/parameter&gt;
+        &lt;/parameter&gt;
+    &lt;/transportSender&gt;
+</pre>
+
+<p>
+    The Transport Listener and Sender both allows the user to configure one or more logical JMS Connection
+    Factories, which are named definitions as shown above. Any remaining parameters maybe defined at the
+    service level via the services.xml. The applicable set of parameters for a service would be the
+    union of the properties from the services.xml and the corresponding logical connection factory.
+</p>
+
+<TABLE WIDTH="100%" BORDER=1 BORDERCOLOR="#000000" CELLPADDING=4 CELLSPACING=0>
+	<COL WIDTH="10%">
+	<COL WIDTH="20%">
+	<COL WIDTH="60%">
+	<COL WIDTH="5%">
+	<COL WIDTH="5%">
+    <tr>
+        <td>Transport level</td>
+        <td><BR></td>
+        <td><BR></td>
+        <td>Listening</td>
+        <td>Sending</td>
+    </tr>
+    <tr>
+        <td>JNDI</td>
+        <td>java.naming.factory.initial</td>
+        <td>The JNDI InitialContext factory class</td>
+        <td>Required</td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td><BR></td>
+        <td>java.naming.provider.url</td>
+        <td>JNDI Provider URL</td>
+        <td>Required</td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td><BR></td>
+        <td>java.naming.security.principal</td>
+        <td>Username for JNDI access</td>
+        <td><BR></td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td><BR></td>
+        <td>java.naming.security.credentials</td>
+        <td>Password for JNDI access</td>
+        <td><BR></td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td>Transactions</td>
+        <td>transport.Transactionality</td>
+        <td>Desired transactionality. One of none / local / jta</td>
+        <td>Defaults to <B>none</B></td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td><BR></td>
+        <td>transport.UserTxnJNDIName</td>
+        <td>JNDI name to be used to obtain a UserTransaction</td>
+        <td>Defaults to &quot;java:comp/UserTransaction&quot;</td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td><BR></td>
+        <td>transport.CacheUserTxn</td>
+        <td>Generally its safe and more efficient to cache the
+			UserTransaction reference from JNDI. One of true/ false</td>
+        <td>Defaults to <B>true</B></td>
+        <td><BR></td>
+    </tr>
+
+    <tr>
+        <td><BR></td>
+        <td>transport.jms.SessionTransacted</td>
+        <td>Should the JMS Session be transacted. One of true/ false</td>
+        <td>Defaults to <B>true</B> when local transactions are used</td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td><BR></td>
+        <td>transport.jms.SessionAcknowledgement</td>
+        <td>JMS Session acknowledgement mode to be used. One of AUTO_ACKNOWLEDGE | CLIENT_ACKNOWLEDGE | DUPS_OK_ACKNOWLEDGE | SESSION_TRANSACTED</td>
+        <td>Defaults to <B>AUTO_ACKNOWLEDGE</B></td>
+        <td><BR></td>
+    </tr>    
+
+    <tr>
+        <td>Connection</td>
+        <td>transport.jms.ConnectionFactory</td>
+        <td>Name of the logical connection factory this service will use</td>
+        <td>Defaults to &quot;default&quot;</td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td><BR></td>
+        <td>transport.jms.ConnectionFactoryJNDIName</td>
+        <td>The JNDI name of the JMS ConnectionFactory</td>
+        <td>Required</td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td><BR></td>
+        <td>transport.jms.ConnectionFactoryType</td>
+        <td> Type of ConnectionFactory &ndash; queue / topic</td>
+        <td>Suggested to be specified</td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td><BR></td>
+        <td>transport.jms.JMSSpecVersion</td>
+        <td>JMS API Version One of &quot;1.1&quot; or &quot;1.0.2b&quot;</td>
+        <td>Defaults to 1.1</td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td><BR></td>
+        <td>transport.jms.UserName</td>
+        <td>The JMS connection username</td>
+        <td><BR></td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td><BR></td>
+        <td>transport.jms.Password</td>
+        <td>The JMS connection password</td>
+        <td><BR></td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td>Destinations</td>
+        <td>transport.jms.Destination</td>
+        <td>JNDI Name of the Destination </td>
+        <td>Defaults to Service name</td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td><BR></td>
+        <td>transport.jms.DestinationType</td>
+        <td>Type of Destination &ndash; queue / topic</td>
+        <td>Defaults to a queue</td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td><BR></td>
+        <td>transport.jms.DefaultReplyDestination</td>
+        <td>JNDI Name of the default reply Destination</td>
+        <td><BR></td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td><BR></td>
+        <td>transport.jms.DefaultReplyDestinationType</td>
+        <td>Type of the reply Destination &ndash; queue / topic</td>
+        <td>Same type as of  Destination</td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td>Advanced</td>
+        <td>transport.jms.MessageSelector</td>
+        <td>Optional message selector to be applied</td>
+        <td><BR></td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td><BR></td>
+        <td>transport.jms.SubscriptionDurable</td>
+        <td>Is the subscription durable? (For Topics) &ndash; true / false</td>
+        <td>Defaults to <B>false</B></td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td><BR></td>
+        <td>transport.jms.DurableSubscriberName</td>
+        <td>Name to be used for the durable subscription</td>
+        <td>Required when subscription is durable</td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td><BR></td>
+        <td>transport.jms.PubSubNoLocal</td>
+        <td>Should messages published by the same connection (for Topics)
+			be received? &ndash; true / false</td>
+        <td>Defaults to <B>false</B></td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td><BR></td>
+        <td>transport.jms.CacheLevel</td>
+        <td>The JMS resource cache level. One of none / connection /
+			session / consumer / producer / auto</td>
+        <td>Defaults to <B>auto</B> </td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td><BR></td>
+        <td>transport.jms.ReceiveTimeout</td>
+        <td>Time to wait for a JMS message during polling. Negative means
+			wait forever, while 0 means do not wait at all. Anything else, is
+			a millisecond value for the poll</td>
+        <td>Defaults to 1000ms</td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td><BR></td>
+        <td>transport.jms.ConcurrentConsumers</td>
+        <td>Number of concurrent consumer tasks (~threads) to be started to
+			poll for messages for this service. For Topics, this should be
+			always 1, to prevent the same message being processed multiple
+			times</td>
+        <td>Defaults to <B>1</B></td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td><BR></td>
+        <td>transport.jms.MaxConcurrentConsumers</td>
+        <td>Will dynamically scale the number of concurrent consumer tasks
+			(~threads) until this value; as the load increases. Should always
+			be 1 for Topics.</td>
+        <td>Defaults to <B>1</B></td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td><BR></td>
+        <td>transport.jms.IdleTaskLimit</td>
+        <td>The number of idle (i.e. poll without receipt of a message)
+			runs per task, before it dies &ndash; to recycle resources, and to
+			allow dynamic scale down.</td>
+        <td>Defaults to 10</td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td><BR></td>
+        <td>transport.jms.MaxMessagesPerTask</td>
+        <td>The maximum number of successful message receipts to limit per
+			Task lifetime. </td>
+        <td>Defaults to <B>&ndash;1</B> which implies unlimited messages</td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td>Reconnection</td>
+        <td>transport.jms.InitialReconnectDuration</td>
+        <td>Initial reconnection attempt duration</td>
+        <td>Defaults to 10,000ms</td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td><BR></td>
+        <td>transport.jms.ReconnectProgressFactor</td>
+        <td>Factor used to compute consecutive reconnection attempt
+			durations, in a geometric series</td>
+        <td>Defaults to <B>2 (i.e. exponential)</B></td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td><BR></td>
+        <td>transport.jms.MaxReconnectDuration</td>
+        <td>Maximum limit for a reconnection duration</td>
+        <td>Defaults to <B>1 hour</B></td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td><BR></td>
+        <td>transport.jms.PublishEPR</td>
+        <td>One or more JMS URL's to be showed as the JMS EPRs on the WSDL
+			for the service. Allows the specification of a custom EPR, and/or
+			hiding of internal properties from a public EPR (e.g.
+			credentials). Add one as LEGACY to retain auto generated EPR, when
+			adding new EPRs</td>
+        <td><BR></td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td><BR></td>
+        <td><BR></td>
+        <td><BR></td>
+        <td><BR></td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td>Legacy Mode and Payload handling</td>
+        <td>Wrapper</td>
+        <td>Binary and Text payload wrapper element to be specified as &quot;{ns}name&quot; where ns refers to a namespace and name the name of the element</td>
+        <td>Default binary wrapper<ul><li>{http://ws.apache.org/commons/ns/payload}binary</li></ul>
+            Default text wrapper <ul><li>{http://ws.apache.org/commons/ns/payload}text</li></ul></td>
+        <td><BR></td>
+    </tr>
+    <tr>
+        <td><BR></td>
+        <td>Operation</td>
+        <td>operation name to be specified as &quot;{ns}name&quot; where ns refers to the namespace and name the name of the operation</td>
+        <td>Defaults to urn:mediate</td>
+        <td><BR></td>
+    </tr>
+</TABLE>
+
+</body>
+</html>
\ No newline at end of file

Modified: webservices/commons/trunk/modules/transport/modules/testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/MinConcurrencyTest.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/MinConcurrencyTest.java?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/MinConcurrencyTest.java (original)
+++ webservices/commons/trunk/modules/transport/modules/testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/MinConcurrencyTest.java Mon Dec  8 10:15:40 2008
@@ -71,20 +71,24 @@
         this.messages = messages;
         this.preloadMessages = preloadMessages;
     }
+
+    private int concurrencyReached;
+    private final Object concurrencyReachedLock = new Object();
+    private final Object shutdownAwaitLock = new Object();
     
     @Override
     protected void runTest() throws Throwable {
         int endpointCount = channels.length;
         int expectedConcurrency = endpointCount * messages;
         
-        final CountDownLatch shutdownLatch = new CountDownLatch(1);
-        final CountDownLatch concurrencyReachedLatch = new CountDownLatch(expectedConcurrency);
-        
         final MessageReceiver messageReceiver = new MessageReceiver() {
             public void receive(MessageContext msgContext) throws AxisFault {
-                concurrencyReachedLatch.countDown();
+                synchronized (concurrencyReachedLock) {
+                    concurrencyReached++;
+                    concurrencyReachedLock.notifyAll();
+                }
                 try {
-                    shutdownLatch.await();
+                    shutdownAwaitLock.wait();
                 } catch (InterruptedException ex) {
                 }
             }
@@ -135,14 +139,25 @@
                     endpointResourceSets[i] = endpointResourceSet;
                 }
             }
-        
-            if (!concurrencyReachedLatch.await(5, TimeUnit.SECONDS)) {
-                fail("Concurrency reached is " + (expectedConcurrency -
-                        concurrencyReachedLatch.getCount()) + ", but expected " +
-                        expectedConcurrency);
+
+            long startTime = System.currentTimeMillis();
+            while (concurrencyReached < expectedConcurrency
+                && System.currentTimeMillis() < (startTime + 5000)) {
+                synchronized(concurrencyReachedLock) {
+                    concurrencyReachedLock.wait(5000);
+                }
+            }
+            
+            synchronized(shutdownAwaitLock) {
+                shutdownAwaitLock.notifyAll();
             }
+
+            if (concurrencyReached < expectedConcurrency) {
+                fail("Concurrency reached is " + concurrencyReached + ", but expected " +
+                    expectedConcurrency);
+            }
+
         } finally {
-            shutdownLatch.countDown();
             for (int i=0; i<endpointCount; i++) {
                 if (endpointResourceSets[i] != null) {
                     endpointResourceSets[i].tearDown();

Modified: webservices/commons/trunk/modules/transport/modules/tests/log4j.properties
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/tests/log4j.properties?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/tests/log4j.properties (original)
+++ webservices/commons/trunk/modules/transport/modules/tests/log4j.properties Mon Dec  8 10:15:40 2008
@@ -20,8 +20,12 @@
 # log4j configuration file used by unit tests
 
 log4j.rootCategory=DEBUG, CONSOLE
+#log4j.rootCategory=WARN, CONSOLE
+
+log4j.category.org.apache.axis2.transport.jms=TRACE
 
 log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
 log4j.appender.CONSOLE.threshold=ERROR
+#log4j.appender.CONSOLE.threshold=TRACE
 log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
 log4j.appender.CONSOLE.layout.ConversionPattern=%5p [%t] %c{1} %m%n

Modified: webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSChannel.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSChannel.java?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSChannel.java (original)
+++ webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSChannel.java Mon Dec  8 10:15:40 2008
@@ -137,8 +137,8 @@
     }
 
     public void setupService(AxisService service, boolean isClientSide) throws Exception {
-        service.addParameter(JMSConstants.CONFAC_PARAM, connectionFactoryName);
-        service.addParameter(JMSConstants.DEST_PARAM_TYPE, destinationType);
-        service.addParameter(JMSConstants.DEST_PARAM, jndiName);
+        service.addParameter(JMSConstants.PARAM_JMS_CONFAC, connectionFactoryName);
+        service.addParameter(JMSConstants.PARAM_DEST_TYPE, destinationType);
+        service.addParameter(JMSConstants.PARAM_DESTINATION, jndiName);
     }
 }

Modified: webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSRequestResponseChannel.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSRequestResponseChannel.java?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSRequestResponseChannel.java (original)
+++ webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSRequestResponseChannel.java Mon Dec  8 10:15:40 2008
@@ -63,8 +63,8 @@
     @Override
     public void setupService(AxisService service, boolean isClientSide) throws Exception {
         super.setupService(service, isClientSide);
-        service.addParameter(JMSConstants.REPLY_PARAM_TYPE, replyDestinationType);
-        service.addParameter(JMSConstants.REPLY_PARAM, replyJndiName);
+        service.addParameter(JMSConstants.PARAM_REPLY_DEST_TYPE, replyDestinationType);
+        service.addParameter(JMSConstants.PARAM_REPLY_DESTINATION, replyJndiName);
     }
 
     public void setupRequestMessageContext(MessageContext msgContext) {
@@ -74,7 +74,7 @@
     @Override
     public EndpointReference getEndpointReference() throws Exception {
         String address = super.getEndpointReference().getAddress();
-        return new EndpointReference(address + "&" + JMSConstants.REPLY_PARAM_TYPE + "=" + replyDestinationType + "&" + JMSConstants.REPLY_PARAM + "=" + replyJndiName);
+        return new EndpointReference(address + "&" + JMSConstants.PARAM_REPLY_DEST_TYPE + "=" + replyDestinationType + "&" + JMSConstants.PARAM_REPLY_DESTINATION + "=" + replyJndiName);
     }
 
     @Key("replyDestType")

Modified: webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportDescriptionFactory.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportDescriptionFactory.java?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportDescriptionFactory.java (original)
+++ webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportDescriptionFactory.java Mon Dec  8 10:15:40 2008
@@ -47,6 +47,7 @@
     
     private final boolean singleCF;
     private final boolean cfOnSender;
+    private final int concurrentConsumers;
     private @Transient Context context;
     
     /**
@@ -58,9 +59,10 @@
      *                   should also be configured on the sender. This switch allows
      *                   us to build regression tests for SYNAPSE-448. 
      */
-    public JMSTransportDescriptionFactory(boolean singleCF, boolean cfOnSender) {
+    public JMSTransportDescriptionFactory(boolean singleCF, boolean cfOnSender, int concurrentConsumers) {
         this.singleCF = singleCF;
         this.cfOnSender = cfOnSender;
+        this.concurrentConsumers = concurrentConsumers;
     }
 
     @Setup @SuppressWarnings("unused")
@@ -108,18 +110,20 @@
         OMElement element = createParameterElement(JMSConstants.DEFAULT_CONFAC_NAME, null);
         element.addChild(createParameterElement(Context.INITIAL_CONTEXT_FACTORY,
                 MockContextFactory.class.getName()));
-        element.addChild(createParameterElement(JMSConstants.CONFAC_JNDI_NAME_PARAM,
+        element.addChild(createParameterElement(JMSConstants.PARAM_CONFAC_JNDI_NAME,
                 connFactName));
         if (type != null) {
-            element.addChild(createParameterElement(JMSConstants.CONFAC_TYPE, type));
+            element.addChild(createParameterElement(JMSConstants.PARAM_CONFAC_TYPE, type));
         }
+        element.addChild(createParameterElement(JMSConstants.PARAM_CONCURRENT_CONSUMERS,
+            Integer.toString(concurrentConsumers)));
         trpDesc.addParameter(new Parameter(name, element));
     }
     
     private void setupTransport(ParameterInclude trpDesc) throws AxisFault {
         if (singleCF) {
             // TODO: setting the type to "queue" is nonsense, but required by the transport (see SYNAPSE-439)
-            setupConnectionFactoryConfig(trpDesc, "default", CONNECTION_FACTORY, "queue");
+            setupConnectionFactoryConfig(trpDesc, "default", CONNECTION_FACTORY, null);
         } else {
             setupConnectionFactoryConfig(trpDesc, "queue", QUEUE_CONNECTION_FACTORY, "queue");
             setupConnectionFactoryConfig(trpDesc, "topic", TOPIC_CONNECTION_FACTORY, "topic");

Modified: webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportTest.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportTest.java?rev=724432&r1=724431&r2=724432&view=diff
==============================================================================
--- webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportTest.java (original)
+++ webservices/commons/trunk/modules/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportTest.java Mon Dec  8 10:15:40 2008
@@ -51,14 +51,19 @@
         
         // SYNAPSE-436:
         suite.addExclude("(&(test=EchoXML)(replyDestType=topic)(endpoint=axis))");
-        
+
+        // Example to run a few use cases.. please leave these commented out - asankha
+        //suite.addExclude("(|(test=AsyncXML)(test=MinConcurrency)(destType=topic)(broker=qpid)(destType=topic)(replyDestType=topic)(client=jms)(endpoint=mock)(cfOnSender=true))");
+        //suite.addExclude("(|(test=EchoXML)(destType=queue)(broker=qpid)(cfOnSender=true)(singleCF=false)(destType=queue)(client=jms)(endpoint=mock))");
+        //suite.addExclude("(|(test=EchoXML)(test=AsyncXML)(test=AsyncSwA)(test=AsyncTextPlain)(test=AsyncBinary)(test=AsyncSOAPLarge)(broker=qpid))");
+
         TransportTestSuiteBuilder builder = new TransportTestSuiteBuilder(suite);
 
         JMSTestEnvironment[] environments = new JMSTestEnvironment[] { new QpidTestEnvironment(), new ActiveMQTestEnvironment() };
         for (boolean singleCF : new boolean[] { false, true }) {
             for (boolean cfOnSender : new boolean[] { false, true }) {
                 for (JMSTestEnvironment env : environments) {
-                    builder.addEnvironment(env, new JMSTransportDescriptionFactory(singleCF, cfOnSender));
+                    builder.addEnvironment(env, new JMSTransportDescriptionFactory(singleCF, cfOnSender, 1));
                 }
             }
         }
@@ -87,12 +92,12 @@
         
         builder.addEchoEndpoint(new MockEchoEndpoint());
         builder.addEchoEndpoint(new AxisEchoEndpoint());
-        
+
         for (JMSTestEnvironment env : new JMSTestEnvironment[] { new QpidTestEnvironment(), new ActiveMQTestEnvironment() }) {
             suite.addTest(new MinConcurrencyTest(new AsyncChannel[] {
                     new JMSAsyncChannel("endpoint1", JMSConstants.DESTINATION_TYPE_QUEUE, ContentTypeMode.TRANSPORT),
                     new JMSAsyncChannel("endpoint2", JMSConstants.DESTINATION_TYPE_QUEUE, ContentTypeMode.TRANSPORT) },
-                    2, false, env, new JMSTransportDescriptionFactory(false, false)));
+                    2, false, env, new JMSTransportDescriptionFactory(false, false, 2)));
         }