You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commons-dev@ws.apache.org by as...@apache.org on 2008/11/21 18:34:20 UTC

svn commit: r719648 - in /webservices/commons/trunk/scratch/asankha/transport/modules: jms/src/main/java/org/apache/axis2/transport/jms/ testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/ tests/ tests/src/test/java/org/apache/axis2/tr...

Author: asankha
Date: Fri Nov 21 09:34:18 2008
New Revision: 719648

URL: http://svn.apache.org/viewvc?rev=719648&view=rev
Log:
fix connection, session and consumer caching as pointed out by andreas
fix problem in minconcurrency test for proper operation

Modified:
    webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
    webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
    webservices/commons/trunk/scratch/asankha/transport/modules/testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/MinConcurrencyTest.java
    webservices/commons/trunk/scratch/asankha/transport/modules/tests/log4j.properties
    webservices/commons/trunk/scratch/asankha/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportTest.java

Modified: webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java?rev=719648&r1=719647&r2=719648&view=diff
==============================================================================
--- webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java (original)
+++ webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java Fri Nov 21 09:34:18 2008
@@ -132,15 +132,9 @@
             contentTypeProperty = jmsOut.getContentTypeProperty();
         }
 
-        if (messageSender.getCacheLevel() < JMSConstants.CACHE_SESSION) {
-            // only connection has been cached at most
+        // need to synchronize as Sessions are not thread safe
+        synchronized (messageSender.getSession()) {
             sendOverJMS(msgCtx, messageSender, contentTypeProperty, jmsConnectionFactory, jmsOut);
-            
-        } else {
-            // need to synchronize as Sessions are not thread safe
-            synchronized (messageSender.getSession()) {
-                sendOverJMS(msgCtx, messageSender, contentTypeProperty, jmsConnectionFactory, jmsOut);
-            }
         }
     }
 

Modified: webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java?rev=719648&r1=719647&r2=719648&view=diff
==============================================================================
--- webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java (original)
+++ webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java Fri Nov 21 09:34:18 2008
@@ -48,7 +48,7 @@
  * to re-connect. Thus a connection failure for a single task, will re-initialize the state afresh
  * for the service, by discarding all connections. 
  */
-public class ServiceTaskManager implements ExceptionListener {
+public class ServiceTaskManager {
 
     /** The logger */
     private static final Log log = LogFactory.getLog(ServiceTaskManager.class);
@@ -122,12 +122,6 @@
     private Context context = null;
     /** The ConnectionFactory to be used */
     private ConnectionFactory conFactory = null;
-    /** The shared JMS Connection opened */
-    private Connection sharedConnection = null;
-    /** The shared JMS Connection opened */
-    private Session sharedSession = null;
-    /** The shared JMS Connection opened */
-    private MessageConsumer sharedConsumer = null;
     /** The JMS Destination */
     private Destination destination = null;
 
@@ -144,41 +138,6 @@
     /** The shared thread pool from the Listener */
     private WorkerPool workerPool = null;
 
-    /** Handle JMS Connection exceptions by re-initializing. A single connection failure could
-     * cause re-initialization of multiple MessageListenerTasks / Connections
-     */
-    public void onException(JMSException j) {
-
-        if (!isActive()) {
-            return;
-        }
-
-        // if we failed while active, update state to show failure
-        state = STATE_FAILURE;
-        log.error("JMS Connection failed : " + j.getMessage() + " - shutting down worker tasks", j);
-
-        int r = 1;
-        long retryDuration = initialReconnectDuration;
-
-        do {
-            try {
-                log.info("Reconnection attempt : " + r + " for service : " + serviceName);
-                start();
-            } catch (Exception e) {
-                log.error("Reconnection attempt : " + (r++) + " for service : " + serviceName +
-                    " failed. Next retry in " + (retryDuration/1000) + "seconds", e);
-                retryDuration = (long) (retryDuration * reconnectionProgressionFactor);
-                if (retryDuration > maxReconnectDuration) {
-                    retryDuration = maxReconnectDuration;
-                }
-
-                try {
-                    Thread.sleep(retryDuration);
-                } catch (InterruptedException ignore) {}
-            }
-        } while (!isActive());
-    }
-
     /**
      * Start or re-start the Task Manager by shutting down any existing worker tasks and
      * re-creating them. However, if this is STM is PAUSED, a start request is ignored.
@@ -208,16 +167,16 @@
                     "worker tasks of service : " + serviceName);
                 break;
             case JMSConstants.CACHE_CONNECTION:
-                log.debug("Only the JMS Connection will be cached and shared between poller " +
-                    "tasks of service : " + serviceName);
+                log.debug("Only the JMS Connection will be cached and shared between successive " +
+                    "poller task invocations");
                 break;
             case JMSConstants.CACHE_SESSION:
                 log.debug("The JMS Connection and Session will be cached and shared between " +
-                    "poller tasks of service : " + serviceName);
+                    "successive poller task invocations");
                 break;
             case JMSConstants.CACHE_CONSUMER:
                 log.debug("The JMS Connection, Session and MessageConsumer will be cached and " +
-                    "shared between poller tasks of service : " + serviceName);
+                    "shared between successive poller task invocations");
                 break;
             default : {
                 handleException("Invalid cache level : " + cacheLevel +
@@ -266,42 +225,6 @@
             log.warn("Unable to shutdown all polling tasks of service : " + serviceName);
         }
 
-        if (sharedConsumer != null) {
-            log.debug("Closing shared Consumer - service : " + serviceName);
-            try {
-                sharedConsumer.close();
-            } catch (IllegalStateException ignore) {
-            } catch (JMSException e) {
-                logError("Error closing shared JMS consumer", e);
-            } finally {
-                sharedConsumer = null;
-            }
-        }
-
-        if (sharedSession != null) {
-            log.debug("Closing shared Session - service : " + serviceName);
-            try {
-                sharedSession.close();
-            } catch (IllegalStateException ignore) {
-            } catch (JMSException e) {
-                logError("Error closing shared JMS session", e);
-            } finally {
-                sharedSession = null;
-            }
-        }
-
-        if (sharedConnection != null) {
-            log.debug("Closing shared Connection - service : " + serviceName);
-            try {
-                sharedConnection.close();
-            } catch (IllegalStateException ignore) {
-            } catch (JMSException e) {
-                logError("Error closing shared JMS connection", e);
-            } finally {
-                sharedConnection = null;
-            }
-        }
-
         if (state != STATE_FAILURE) {
             state = STATE_STOPPED;
         }
@@ -316,14 +239,6 @@
         for (MessageListenerTask lstTask : pollingTasks) {
             lstTask.pause();
         }
-
-        if (sharedConnection != null) {
-            try {
-                sharedConnection.stop();
-            } catch (JMSException e) {
-                logError("Error pausing shared JMS connection", e);
-            }
-        }
     }
 
     /**
@@ -333,14 +248,6 @@
         for (MessageListenerTask lstTask : pollingTasks) {
             lstTask.resume();
         }
-
-        if (sharedConnection != null) {
-            try {
-                sharedConnection.start();
-            } catch (JMSException e) {
-                logError("Error pausing shared JMS connection", e);
-            }
-        }
     }
 
     /**
@@ -371,7 +278,7 @@
     /**
      * The actual threads/tasks that perform message polling
      */
-    private class MessageListenerTask implements Runnable {
+    private class MessageListenerTask implements Runnable, ExceptionListener {
 
         /** The Connection used by the polling task */
         private Connection connection = null;
@@ -398,7 +305,7 @@
          */
         public void pause() {
             if (isActive()) {
-                if (connection != null && connection != sharedConnection) {
+                if (connection != null) {
                     try {
                         connection.stop();
                     } catch (JMSException e) {
@@ -413,7 +320,7 @@
          * Resume this polling task
          */
         public void resume() {
-            if (connection != null && connection != sharedConnection) {
+            if (connection != null) {
                 try {
                     connection.start();
                 } catch (JMSException e) {
@@ -493,17 +400,16 @@
                     " is stopping after processing : " + messageCount + " messages");
             }
 
+            closeConsumer(true);
+            closeSession(true);
+            closeConnection(true);
+
             activeTaskCount--;
             synchronized(pollingTasks) {
                 pollingTasks.remove(this);
             }
             // My time is up, so if I am going away, create another
             scheduleNewTaskIfAppropriate();
-
-            // close any non-shared resources
-            closeConsumer(consumer);
-            closeSession(session);
-            closeConnection(connection);
         }
 
         /**
@@ -515,10 +421,10 @@
 
             // get a new connection, session and consumer to prevent a conflict.
             // If idle, it means we can re-use what we already have 
-            if (!idle) {
+            if (consumer == null) {
                 connection = getConnection();
-                session = getSession(connection);
-                consumer = getMessageConsumer(connection, session);
+                session = getSession();
+                consumer = getMessageConsumer();
                 if (log.isDebugEnabled()) {
                     log.debug("Preparing a Connection, Session and Consumer to read messages");
                 }
@@ -575,7 +481,7 @@
                 }
 
                 // close the consumer
-                closeConsumer(consumer);
+                closeConsumer(false);
 
                 // if session was transacted, commit it or rollback
                 try {
@@ -617,9 +523,44 @@
                         " JTA txn for message : " + messageId + " from the session", e);
                 }
 
-                closeSession(session);
-                closeConnection(connection);
+                closeSession(false);
+                closeConnection(false);
+            }
+        }
+
+        /** Handle JMS Connection exceptions by re-initializing. A single connection failure could
+         * cause re-initialization of multiple MessageListenerTasks / Connections
+         */
+        public void onException(JMSException j) {
+
+            if (!isSTMActive()) {
+                return;
             }
+
+            // if we failed while active, update state to show failure
+            setState(STATE_FAILURE);
+            log.error("JMS Connection failed : " + j.getMessage() + " - shutting down worker tasks", j);
+
+            int r = 1;
+            long retryDuration = initialReconnectDuration;
+
+            do {
+                try {
+                    log.info("Reconnection attempt : " + r + " for service : " + serviceName);
+                    start();
+                } catch (Exception e) {
+                    log.error("Reconnection attempt : " + (r++) + " for service : " + serviceName +
+                        " failed. Next retry in " + (retryDuration/1000) + "seconds", e);
+                    retryDuration = (long) (retryDuration * reconnectionProgressionFactor);
+                    if (retryDuration > maxReconnectDuration) {
+                        retryDuration = maxReconnectDuration;
+                    }
+
+                    try {
+                        Thread.sleep(retryDuration);
+                    } catch (InterruptedException ignore) {}
+                }
+            } while (!isSTMActive());
         }
 
         protected void requestShutdown() {
@@ -633,151 +574,184 @@
         protected boolean isTaskIdle() {
             return idle;
         }
-    }
 
-    // -------------- mundane private methods ----------------
-    /**
-     * Close the given Connection, hiding exceptions if any which are logged
-     * @param connection the Connection to be closed
-     */
-    private void closeConnection(Connection connection) {
-        try {
-            if (connection != null && connection != sharedConnection) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Closing non-shared JMS connection for service : " + serviceName);
-                }
-                connection.close();
+        /**
+         * Get a Connection that could/should be used by this task - depends on the cache level to reuse
+         * @return the shared Connection if cache level is higher than CACHE_NONE, or a new Connection
+         */
+        private Connection getConnection() {
+            if (connection == null || cacheLevel < JMSConstants.CACHE_CONNECTION) {
+                connection = createConnection();
             }
-        } catch (JMSException e) {
-            logError("Error closing JMS connection", e);
-        } finally {
-            connection = null;
+            return connection;
         }
-    }
 
-    /**
-     * Close the given Session, hiding exceptions if any which are logged
-     * @param session the Session to be closed
-     */
-    private void closeSession(Session session) {
-        try {
-            if (session != null && session != sharedSession) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Closing non-shared JMS session for service : " + serviceName);
-                }
-                session.close();
+        /**
+         * Get a Session that could/should be used by this task - depends on the cache level to reuse
+         * @param connection the connection (could be the shared connection) to use to create a Session
+         * @return the shared Session if cache level is higher than CACHE_CONNECTION, or a new Session
+         * created using the Connection passed, or a new/shared connection
+         */
+        private Session getSession() {
+            if (session == null || cacheLevel < JMSConstants.CACHE_SESSION) {
+                session = createSession();
             }
-        } catch (JMSException e) {
-            logError("Error closing JMS session", e);
-        } finally {
-            session = null;
+            return session;
         }
-    }
 
-    /**
-     * Close the given Consumer, hiding exceptions if any which are logged
-     * @param consumer the Consumer to be closed
-     */
-    private void closeConsumer(MessageConsumer consumer) {
-        try {
-            if (consumer != null && consumer != sharedConsumer) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Closing non-shared JMS consumer for service : " + serviceName);
-                }
-                consumer.close();
-                consumer = null;
+        /**
+         * Get a MessageConsumer that chould/should be used by this task - depends on the cache
+         * level to reuse
+         * @param connection option Connection to be used
+         * @param session optional Session to be used
+         * @return the shared MessageConsumer if cache level is higher than CACHE_SESSION, or a new
+         * MessageConsumer possibly using the Connection and Session passed in
+         */
+        private MessageConsumer getMessageConsumer() {
+            if (consumer == null || cacheLevel < JMSConstants.CACHE_CONSUMER) {
+                consumer = createConsumer();
             }
-        } catch (JMSException e) {
-            logError("Error closing JMS consumer", e);
-        } finally {
-            consumer = null;
+            return consumer;
         }
-    }
 
-    /**
-     * Get a Connection that could/should be used by this STM - depends on the cache level to reuse
-     * @return the shared Connection if cache level is higher than CACHE_NONE, or a new Connection
-     */
-    private Connection getConnection() {
-        if (cacheLevel > JMSConstants.CACHE_NONE) {
-            return getSharedConnection();
-        } else {
-            return createConnection();
+        /**
+         * Close the given Connection, hiding exceptions if any which are logged
+         * @param connection the Connection to be closed
+         */
+        private void closeConnection(boolean forced) {
+            if (connection != null &&
+                (cacheLevel < JMSConstants.CACHE_CONNECTION || forced)) {
+                try {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Closing non-shared JMS connection for service : " + serviceName);
+                    }
+                    connection.close();
+                } catch (JMSException e) {
+                    logError("Error closing JMS connection", e);
+                } finally {
+                    connection = null;
+                }
+            }
         }
-    }
 
-    /**
-     * Get a Session that could/should be used by this STM - depends on the cache level to reuse
-     * @param connection the connection (could be the shared connection) to use to create a Session
-     * @return the shared Session if cache level is higher than CACHE_CONNECTION, or a new Session
-     * created using the Connection passed, or a new/shared connection
-     */
-    private Session getSession(Connection connection) {
-        if (cacheLevel > JMSConstants.CACHE_CONNECTION) {
-            return getSharedSession();
-        } else {
-            return createSession((connection == null ? getConnection() : connection));
+        /**
+         * Close the given Session, hiding exceptions if any which are logged
+         * @param session the Session to be closed
+         */
+        private void closeSession(boolean forced) {
+            if (session != null &&
+                (cacheLevel < JMSConstants.CACHE_SESSION || forced)) {
+                try {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Closing non-shared JMS session for service : " + serviceName);
+                    }
+                    session.close();
+                } catch (JMSException e) {
+                    logError("Error closing JMS session", e);
+                } finally {
+                    session = null;
+                }
+            }
         }
-    }
 
-    /**
-     * Get a MessageConsumer that chould/should be used by this STM - depends on the cache level to
-     * reuse
-     * @param connection option Connection to be used
-     * @param session optional Session to be used
-     * @return the shared MessageConsumer if cache level is higher than CACHE_SESSION, or a new
-     * MessageConsumer possibly using the Connection and Session passed in
-     */
-    private MessageConsumer getMessageConsumer(Connection connection, Session session) {
-        if (cacheLevel > JMSConstants.CACHE_SESSION) {
-            return getSharedConsumer();
-        } else {
-            return createConsumer((session == null ? getSession(connection) : session));
+        /**
+         * Close the given Consumer, hiding exceptions if any which are logged
+         * @param consumer the Consumer to be closed
+         */
+        private void closeConsumer(boolean forced) {
+            if (consumer != null &&
+                (cacheLevel < JMSConstants.CACHE_CONSUMER || forced)) {
+                try {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Closing non-shared JMS consumer for service : " + serviceName);
+                    }
+                    consumer.close();
+                } catch (JMSException e) {
+                    logError("Error closing JMS consumer", e);
+                } finally {
+                    consumer = null;
+                }
+            }
         }
-    }
 
-    /**
-     * Get the shared Connection for this STM
-     * @return shared Connection for the STM
-     */
-    private synchronized Connection getSharedConnection() {
-        if  (sharedConnection == null) {
-            sharedConnection = createConnection();
-            if (log.isDebugEnabled()) {
-                log.debug("Created shared JMS Connection for service : " + serviceName);
+        /**
+         * Create a new Connection for this STM, using JNDI properties and credentials provided
+         * @return a new Connection for this STM, using JNDI properties and credentials provided
+         */
+        private Connection createConnection() {
+
+            try {
+                conFactory = JMSUtils.lookup(
+                    getInitialContext(), ConnectionFactory.class, getConnFactoryJNDIName());
+                log.info("Connected to the JMS connection factory : " + getConnFactoryJNDIName());
+            } catch (NamingException e) {
+                handleException("Error looking up connection factory : " + getConnFactoryJNDIName() +
+                    " using JNDI properties : " + jndiProperties, e);
+            }
+
+            Connection connection = null;
+            try {
+                connection = JMSUtils.createConnection(
+                    conFactory,
+                    jndiProperties.get(Context.SECURITY_PRINCIPAL),
+                    jndiProperties.get(Context.SECURITY_CREDENTIALS),
+                    isJmsSpec11(), isQueue());
+
+                connection.setExceptionListener(this);
+                connection.start();
+                log.info("JMS Connection for service : " + serviceName + " created and started");
+
+            } catch (JMSException e) {
+                handleException("Error acquiring a JMS connection to : " + getConnFactoryJNDIName() +
+                    " using JNDI properties : " + jndiProperties, e);
             }
+            return connection;
         }
-        return sharedConnection;
-    }
 
-    /**
-     * Get the shared Session for the STM
-     * @return shared Session for the STM
-     */
-    private synchronized Session getSharedSession() {
-        if (sharedSession == null) {
-            sharedSession = createSession(getSharedConnection());
-            if (log.isDebugEnabled()) {
-                log.debug("Created shared JMS Session for service : " + serviceName);
+        /**
+         * Create a new Session for this STM
+         * @param connection the Connection to be used
+         * @return a new Session created using the Connection passed in
+         */
+        private Session createSession() {
+            try {
+                if (log.isDebugEnabled()) {
+                    log.debug("Creating a new JMS Session for service : " + serviceName);
+                }
+                return JMSUtils.createSession(
+                    connection, isSessionTransacted(), getSessionAckMode(), isJmsSpec11(), isQueue());
+
+            } catch (JMSException e) {
+                handleException("Error creating JMS session for service : " + serviceName, e);
             }
+            return null;
         }
-        return sharedSession;
-    }
 
-    /**
-     * Get the shared MessageConsumer for the STM
-     * @return shared MessageConsumer for the STM
-     */
-    private synchronized MessageConsumer getSharedConsumer() {
-        if (sharedConsumer == null) {
-            sharedConsumer = createConsumer(getSharedSession());
-            if (log.isDebugEnabled()) {
-                log.debug("Created shared JMS MessageConsumer for service : " + serviceName);
+        /**
+         * Create a new MessageConsumer for this STM
+         * @param session the Session to be used
+         * @return a new MessageConsumer created using the Session passed in
+         */
+        private MessageConsumer createConsumer() {
+            try {
+                if (log.isDebugEnabled()) {
+                    log.debug("Creating a new JMS MessageConsumer for service : " + serviceName);
+                }
+
+                return JMSUtils.createConsumer(
+                    session, getDestination(), isQueue(),
+                    (isSubscriptionDurable() && getDurableSubscriberName() == null ?
+                        getDurableSubscriberName() : serviceName),
+                    getMessageSelector(), isPubSubNoLocal(), isSubscriptionDurable(), isJmsSpec11());
+
+            } catch (JMSException e) {
+                handleException("Error creating JMS consumer for service : " + serviceName,e);
             }
+            return null;
         }
-        return sharedConsumer;
     }
 
+    // -------------- mundane private methods ----------------
     /**
      * Get the InitialContext for lookup using the JNDI parameters applicable to the service
      * @return the InitialContext to be used
@@ -800,7 +774,7 @@
                 context = getInitialContext();
                 destination = JMSUtils.lookup(context, Destination.class, getDestinationJNDIName());
                 if (log.isDebugEnabled()) {
-                    log.debug("JMS Destionation with JNDI name : " + getDestinationJNDIName() +
+                    log.debug("JMS Destination with JNDI name : " + getDestinationJNDIName() +
                         " found for service " + serviceName);
                 }
             } catch (NamingException e) {
@@ -847,85 +821,8 @@
         return sharedUserTransaction;
     }
 
-    /**
-     * Create a new Connection for this STM, using JNDI properties and credentials provided
-     * @return a new Connection for this STM, using JNDI properties and credentials provided
-     */
-    private Connection createConnection() {
-
-        try {
-            conFactory = JMSUtils.lookup(
-                getInitialContext(), ConnectionFactory.class, getConnFactoryJNDIName());
-            log.info("Connected to the JMS connection factory : " + getConnFactoryJNDIName());
-        } catch (NamingException e) {
-            handleException("Error looking up connection factory : " + getConnFactoryJNDIName() +
-                " using JNDI properties : " + jndiProperties, e);
-        }
-
-        Connection connection = null;
-        try {
-            connection = JMSUtils.createConnection(
-                conFactory,
-                jndiProperties.get(Context.SECURITY_PRINCIPAL),
-                jndiProperties.get(Context.SECURITY_CREDENTIALS),
-                isJmsSpec11(), isQueue());
-
-            connection.setExceptionListener(this);
-            connection.start();
-            log.info("JMS Connection for service : " + serviceName + " created and started");
-
-        } catch (JMSException e) {
-            handleException("Error acquiring a JMS connection to : " + getConnFactoryJNDIName() +
-                " using JNDI properties : " + jndiProperties, e);
-        }
-        return connection;
-    }
-
-    /**
-     * Create a new Session for this STM
-     * @param connection the Connection to be used
-     * @return a new Session created using the Connection passed in
-     */
-    private Session createSession(Connection connection) {
-        try {
-            if (log.isDebugEnabled()) {
-                log.debug("Creating a new JMS Session for service : " + serviceName);
-            }
-            return JMSUtils.createSession(
-                connection, isSessionTransacted(), getSessionAckMode(), isJmsSpec11(), isQueue());
-
-        } catch (JMSException e) {
-            handleException("Error creating JMS session for service : " + serviceName, e);
-        }
-        return null;
-    }
-
-    /**
-     * Create a new MessageConsumer for this STM
-     * @param session the Session to be used
-     * @return a new MessageConsumer created using the Session passed in
-     */
-    private MessageConsumer createConsumer(Session session) {
-        try {
-            if (log.isDebugEnabled()) {
-                log.debug("Creating a new JMS MessageConsumer for service : " + serviceName);
-            }
-
-            return JMSUtils.createConsumer(
-                session, getDestination(), isQueue(), 
-                (isSubscriptionDurable() && getDurableSubscriberName() == null ?
-                    getDurableSubscriberName() : serviceName),
-                getMessageSelector(), isPubSubNoLocal(), isSubscriptionDurable(), isJmsSpec11());
-
-        } catch (JMSException e) {
-            handleException("Error creating JMS consumer for service : " + serviceName,e);
-        }
-        return null;
-    }
-
-
     // -------------------- trivial methods ---------------------
-    private boolean isActive() {
+    private boolean isSTMActive() {
         return state == STATE_STARTED;
     }
 
@@ -1188,6 +1085,10 @@
         return activeTaskCount;
     }
 
+    public void setState(int state) {
+        this.state = state;
+    }
+
     //--------------------- used for development testing---------------------------
     /*public static void main(String[] args) throws Exception {
         //org.apache.log4j.BasicConfigurator.configure();

Modified: webservices/commons/trunk/scratch/asankha/transport/modules/testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/MinConcurrencyTest.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/MinConcurrencyTest.java?rev=719648&r1=719647&r2=719648&view=diff
==============================================================================
--- webservices/commons/trunk/scratch/asankha/transport/modules/testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/MinConcurrencyTest.java (original)
+++ webservices/commons/trunk/scratch/asankha/transport/modules/testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/MinConcurrencyTest.java Fri Nov 21 09:34:18 2008
@@ -71,20 +71,24 @@
         this.messages = messages;
         this.preloadMessages = preloadMessages;
     }
+
+    private int concurrencyReached;
+    private final Object concurrencyReachedLock = new Object();
+    private final Object shutdownAwaitLock = new Object();
     
     @Override
     protected void runTest() throws Throwable {
         int endpointCount = channels.length;
         int expectedConcurrency = endpointCount * messages;
         
-        final CountDownLatch shutdownLatch = new CountDownLatch(1);
-        final CountDownLatch concurrencyReachedLatch = new CountDownLatch(expectedConcurrency);
-        
         final MessageReceiver messageReceiver = new MessageReceiver() {
             public void receive(MessageContext msgContext) throws AxisFault {
-                concurrencyReachedLatch.countDown();
+                synchronized (concurrencyReachedLock) {
+                    concurrencyReached++;
+                    concurrencyReachedLock.notifyAll();
+                }
                 try {
-                    shutdownLatch.await();
+                    shutdownAwaitLock.wait();
                 } catch (InterruptedException ex) {
                 }
             }
@@ -135,14 +139,25 @@
                     endpointResourceSets[i] = endpointResourceSet;
                 }
             }
-        
-            if (!concurrencyReachedLatch.await(5, TimeUnit.SECONDS)) {
-                fail("Concurrency reached is " + (expectedConcurrency -
-                        concurrencyReachedLatch.getCount()) + ", but expected " +
-                        expectedConcurrency);
+
+            long startTime = System.currentTimeMillis();
+            while (concurrencyReached < expectedConcurrency
+                && System.currentTimeMillis() < (startTime + 5000)) {
+                synchronized(concurrencyReachedLock) {
+                    concurrencyReachedLock.wait();
+                }
+            }
+            
+            synchronized(shutdownAwaitLock) {
+                shutdownAwaitLock.notifyAll();
             }
+
+            if (concurrencyReached < expectedConcurrency) {
+                fail("Concurrency reached is " + concurrencyReached + ", but expected " +
+                    expectedConcurrency);
+            }
+
         } finally {
-            shutdownLatch.countDown();
             for (int i=0; i<endpointCount; i++) {
                 if (endpointResourceSets[i] != null) {
                     endpointResourceSets[i].tearDown();

Modified: webservices/commons/trunk/scratch/asankha/transport/modules/tests/log4j.properties
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/tests/log4j.properties?rev=719648&r1=719647&r2=719648&view=diff
==============================================================================
--- webservices/commons/trunk/scratch/asankha/transport/modules/tests/log4j.properties (original)
+++ webservices/commons/trunk/scratch/asankha/transport/modules/tests/log4j.properties Fri Nov 21 09:34:18 2008
@@ -20,10 +20,12 @@
 # log4j configuration file used by unit tests
 
 log4j.rootCategory=DEBUG, CONSOLE
+#log4j.rootCategory=WARN, CONSOLE
 
 log4j.category.org.apache.axis2.transport.jms=TRACE
 
 log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
 log4j.appender.CONSOLE.threshold=ERROR
+#log4j.appender.CONSOLE.threshold=TRACE
 log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
 log4j.appender.CONSOLE.layout.ConversionPattern=%5p [%t] %c{1} %m%n

Modified: webservices/commons/trunk/scratch/asankha/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportTest.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportTest.java?rev=719648&r1=719647&r2=719648&view=diff
==============================================================================
--- webservices/commons/trunk/scratch/asankha/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportTest.java (original)
+++ webservices/commons/trunk/scratch/asankha/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportTest.java Fri Nov 21 09:34:18 2008
@@ -53,6 +53,7 @@
         suite.addExclude("(&(test=EchoXML)(replyDestType=topic)(endpoint=axis))");
 
         // Example to run a few use cases.. please leave these commented out - asankha
+        //suite.addExclude("(|(test=AsyncXML)(test=MinConcurrency)(destType=topic)(broker=qpid)(destType=topic)(replyDestType=topic)(client=jms)(endpoint=mock)(cfOnSender=true))");
         //suite.addExclude("(|(test=EchoXML)(destType=queue)(broker=qpid)(cfOnSender=true)(singleCF=false)(destType=queue)(client=jms)(endpoint=mock))");
         //suite.addExclude("(|(test=EchoXML)(test=AsyncXML)(test=AsyncSwA)(test=AsyncTextPlain)(test=AsyncBinary)(test=AsyncSOAPLarge)(broker=qpid))");