You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commons-dev@ws.apache.org by as...@apache.org on 2008/11/21 18:34:20 UTC
svn commit: r719648 - in
/webservices/commons/trunk/scratch/asankha/transport/modules:
jms/src/main/java/org/apache/axis2/transport/jms/
testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/ tests/
tests/src/test/java/org/apache/axis2/tr...
Author: asankha
Date: Fri Nov 21 09:34:18 2008
New Revision: 719648
URL: http://svn.apache.org/viewvc?rev=719648&view=rev
Log:
fix connection, session and consumer caching as pointed out by andreas
fix problem in minconcurrency test for proper operation
Modified:
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
webservices/commons/trunk/scratch/asankha/transport/modules/testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/MinConcurrencyTest.java
webservices/commons/trunk/scratch/asankha/transport/modules/tests/log4j.properties
webservices/commons/trunk/scratch/asankha/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportTest.java
Modified: webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java?rev=719648&r1=719647&r2=719648&view=diff
==============================================================================
--- webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java (original)
+++ webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java Fri Nov 21 09:34:18 2008
@@ -132,15 +132,9 @@
contentTypeProperty = jmsOut.getContentTypeProperty();
}
- if (messageSender.getCacheLevel() < JMSConstants.CACHE_SESSION) {
- // only connection has been cached at most
+ // need to synchronize as Sessions are not thread safe
+ synchronized (messageSender.getSession()) {
sendOverJMS(msgCtx, messageSender, contentTypeProperty, jmsConnectionFactory, jmsOut);
-
- } else {
- // need to synchronize as Sessions are not thread safe
- synchronized (messageSender.getSession()) {
- sendOverJMS(msgCtx, messageSender, contentTypeProperty, jmsConnectionFactory, jmsOut);
- }
}
}
Modified: webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java?rev=719648&r1=719647&r2=719648&view=diff
==============================================================================
--- webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java (original)
+++ webservices/commons/trunk/scratch/asankha/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java Fri Nov 21 09:34:18 2008
@@ -48,7 +48,7 @@
* to re-connect. Thus a connection failure for a single task, will re-initialize the state afresh
* for the service, by discarding all connections.
*/
-public class ServiceTaskManager implements ExceptionListener {
+public class ServiceTaskManager {
/** The logger */
private static final Log log = LogFactory.getLog(ServiceTaskManager.class);
@@ -122,12 +122,6 @@
private Context context = null;
/** The ConnectionFactory to be used */
private ConnectionFactory conFactory = null;
- /** The shared JMS Connection opened */
- private Connection sharedConnection = null;
- /** The shared JMS Connection opened */
- private Session sharedSession = null;
- /** The shared JMS Connection opened */
- private MessageConsumer sharedConsumer = null;
/** The JMS Destination */
private Destination destination = null;
@@ -144,41 +138,6 @@
/** The shared thread pool from the Listener */
private WorkerPool workerPool = null;
- /** Handle JMS Connection exceptions by re-initializing. A single connection failure could
- * cause re-initialization of multiple MessageListenerTasks / Connections
- */
- public void onException(JMSException j) {
-
- if (!isActive()) {
- return;
- }
-
- // if we failed while active, update state to show failure
- state = STATE_FAILURE;
- log.error("JMS Connection failed : " + j.getMessage() + " - shutting down worker tasks", j);
-
- int r = 1;
- long retryDuration = initialReconnectDuration;
-
- do {
- try {
- log.info("Reconnection attempt : " + r + " for service : " + serviceName);
- start();
- } catch (Exception e) {
- log.error("Reconnection attempt : " + (r++) + " for service : " + serviceName +
- " failed. Next retry in " + (retryDuration/1000) + "seconds", e);
- retryDuration = (long) (retryDuration * reconnectionProgressionFactor);
- if (retryDuration > maxReconnectDuration) {
- retryDuration = maxReconnectDuration;
- }
-
- try {
- Thread.sleep(retryDuration);
- } catch (InterruptedException ignore) {}
- }
- } while (!isActive());
- }
-
/**
* Start or re-start the Task Manager by shutting down any existing worker tasks and
* re-creating them. However, if this is STM is PAUSED, a start request is ignored.
@@ -208,16 +167,16 @@
"worker tasks of service : " + serviceName);
break;
case JMSConstants.CACHE_CONNECTION:
- log.debug("Only the JMS Connection will be cached and shared between poller " +
- "tasks of service : " + serviceName);
+ log.debug("Only the JMS Connection will be cached and shared between successive " +
+ "poller task invocations");
break;
case JMSConstants.CACHE_SESSION:
log.debug("The JMS Connection and Session will be cached and shared between " +
- "poller tasks of service : " + serviceName);
+ "successive poller task invocations");
break;
case JMSConstants.CACHE_CONSUMER:
log.debug("The JMS Connection, Session and MessageConsumer will be cached and " +
- "shared between poller tasks of service : " + serviceName);
+ "shared between successive poller task invocations");
break;
default : {
handleException("Invalid cache level : " + cacheLevel +
@@ -266,42 +225,6 @@
log.warn("Unable to shutdown all polling tasks of service : " + serviceName);
}
- if (sharedConsumer != null) {
- log.debug("Closing shared Consumer - service : " + serviceName);
- try {
- sharedConsumer.close();
- } catch (IllegalStateException ignore) {
- } catch (JMSException e) {
- logError("Error closing shared JMS consumer", e);
- } finally {
- sharedConsumer = null;
- }
- }
-
- if (sharedSession != null) {
- log.debug("Closing shared Session - service : " + serviceName);
- try {
- sharedSession.close();
- } catch (IllegalStateException ignore) {
- } catch (JMSException e) {
- logError("Error closing shared JMS session", e);
- } finally {
- sharedSession = null;
- }
- }
-
- if (sharedConnection != null) {
- log.debug("Closing shared Connection - service : " + serviceName);
- try {
- sharedConnection.close();
- } catch (IllegalStateException ignore) {
- } catch (JMSException e) {
- logError("Error closing shared JMS connection", e);
- } finally {
- sharedConnection = null;
- }
- }
-
if (state != STATE_FAILURE) {
state = STATE_STOPPED;
}
@@ -316,14 +239,6 @@
for (MessageListenerTask lstTask : pollingTasks) {
lstTask.pause();
}
-
- if (sharedConnection != null) {
- try {
- sharedConnection.stop();
- } catch (JMSException e) {
- logError("Error pausing shared JMS connection", e);
- }
- }
}
/**
@@ -333,14 +248,6 @@
for (MessageListenerTask lstTask : pollingTasks) {
lstTask.resume();
}
-
- if (sharedConnection != null) {
- try {
- sharedConnection.start();
- } catch (JMSException e) {
- logError("Error pausing shared JMS connection", e);
- }
- }
}
/**
@@ -371,7 +278,7 @@
/**
* The actual threads/tasks that perform message polling
*/
- private class MessageListenerTask implements Runnable {
+ private class MessageListenerTask implements Runnable, ExceptionListener {
/** The Connection used by the polling task */
private Connection connection = null;
@@ -398,7 +305,7 @@
*/
public void pause() {
if (isActive()) {
- if (connection != null && connection != sharedConnection) {
+ if (connection != null) {
try {
connection.stop();
} catch (JMSException e) {
@@ -413,7 +320,7 @@
* Resume this polling task
*/
public void resume() {
- if (connection != null && connection != sharedConnection) {
+ if (connection != null) {
try {
connection.start();
} catch (JMSException e) {
@@ -493,17 +400,16 @@
" is stopping after processing : " + messageCount + " messages");
}
+ closeConsumer(true);
+ closeSession(true);
+ closeConnection(true);
+
activeTaskCount--;
synchronized(pollingTasks) {
pollingTasks.remove(this);
}
// My time is up, so if I am going away, create another
scheduleNewTaskIfAppropriate();
-
- // close any non-shared resources
- closeConsumer(consumer);
- closeSession(session);
- closeConnection(connection);
}
/**
@@ -515,10 +421,10 @@
// get a new connection, session and consumer to prevent a conflict.
// If idle, it means we can re-use what we already have
- if (!idle) {
+ if (consumer == null) {
connection = getConnection();
- session = getSession(connection);
- consumer = getMessageConsumer(connection, session);
+ session = getSession();
+ consumer = getMessageConsumer();
if (log.isDebugEnabled()) {
log.debug("Preparing a Connection, Session and Consumer to read messages");
}
@@ -575,7 +481,7 @@
}
// close the consumer
- closeConsumer(consumer);
+ closeConsumer(false);
// if session was transacted, commit it or rollback
try {
@@ -617,9 +523,44 @@
" JTA txn for message : " + messageId + " from the session", e);
}
- closeSession(session);
- closeConnection(connection);
+ closeSession(false);
+ closeConnection(false);
+ }
+ }
+
+ /** Handle JMS Connection exceptions by re-initializing. A single connection failure could
+ * cause re-initialization of multiple MessageListenerTasks / Connections
+ */
+ public void onException(JMSException j) {
+
+ if (!isSTMActive()) {
+ return;
}
+
+ // if we failed while active, update state to show failure
+ setState(STATE_FAILURE);
+ log.error("JMS Connection failed : " + j.getMessage() + " - shutting down worker tasks", j);
+
+ int r = 1;
+ long retryDuration = initialReconnectDuration;
+
+ do {
+ try {
+ log.info("Reconnection attempt : " + r + " for service : " + serviceName);
+ start();
+ } catch (Exception e) {
+ log.error("Reconnection attempt : " + (r++) + " for service : " + serviceName +
+ " failed. Next retry in " + (retryDuration/1000) + "seconds", e);
+ retryDuration = (long) (retryDuration * reconnectionProgressionFactor);
+ if (retryDuration > maxReconnectDuration) {
+ retryDuration = maxReconnectDuration;
+ }
+
+ try {
+ Thread.sleep(retryDuration);
+ } catch (InterruptedException ignore) {}
+ }
+ } while (!isSTMActive());
}
protected void requestShutdown() {
@@ -633,151 +574,184 @@
protected boolean isTaskIdle() {
return idle;
}
- }
- // -------------- mundane private methods ----------------
- /**
- * Close the given Connection, hiding exceptions if any which are logged
- * @param connection the Connection to be closed
- */
- private void closeConnection(Connection connection) {
- try {
- if (connection != null && connection != sharedConnection) {
- if (log.isDebugEnabled()) {
- log.debug("Closing non-shared JMS connection for service : " + serviceName);
- }
- connection.close();
+ /**
+ * Get a Connection that could/should be used by this task - depends on the cache level to reuse
+ * @return the shared Connection if cache level is higher than CACHE_NONE, or a new Connection
+ */
+ private Connection getConnection() {
+ if (connection == null || cacheLevel < JMSConstants.CACHE_CONNECTION) {
+ connection = createConnection();
}
- } catch (JMSException e) {
- logError("Error closing JMS connection", e);
- } finally {
- connection = null;
+ return connection;
}
- }
- /**
- * Close the given Session, hiding exceptions if any which are logged
- * @param session the Session to be closed
- */
- private void closeSession(Session session) {
- try {
- if (session != null && session != sharedSession) {
- if (log.isDebugEnabled()) {
- log.debug("Closing non-shared JMS session for service : " + serviceName);
- }
- session.close();
+ /**
+ * Get a Session that could/should be used by this task - depends on the cache level to reuse
+ * @param connection the connection (could be the shared connection) to use to create a Session
+ * @return the shared Session if cache level is higher than CACHE_CONNECTION, or a new Session
+ * created using the Connection passed, or a new/shared connection
+ */
+ private Session getSession() {
+ if (session == null || cacheLevel < JMSConstants.CACHE_SESSION) {
+ session = createSession();
}
- } catch (JMSException e) {
- logError("Error closing JMS session", e);
- } finally {
- session = null;
+ return session;
}
- }
- /**
- * Close the given Consumer, hiding exceptions if any which are logged
- * @param consumer the Consumer to be closed
- */
- private void closeConsumer(MessageConsumer consumer) {
- try {
- if (consumer != null && consumer != sharedConsumer) {
- if (log.isDebugEnabled()) {
- log.debug("Closing non-shared JMS consumer for service : " + serviceName);
- }
- consumer.close();
- consumer = null;
+ /**
+ * Get a MessageConsumer that chould/should be used by this task - depends on the cache
+ * level to reuse
+ * @param connection option Connection to be used
+ * @param session optional Session to be used
+ * @return the shared MessageConsumer if cache level is higher than CACHE_SESSION, or a new
+ * MessageConsumer possibly using the Connection and Session passed in
+ */
+ private MessageConsumer getMessageConsumer() {
+ if (consumer == null || cacheLevel < JMSConstants.CACHE_CONSUMER) {
+ consumer = createConsumer();
}
- } catch (JMSException e) {
- logError("Error closing JMS consumer", e);
- } finally {
- consumer = null;
+ return consumer;
}
- }
- /**
- * Get a Connection that could/should be used by this STM - depends on the cache level to reuse
- * @return the shared Connection if cache level is higher than CACHE_NONE, or a new Connection
- */
- private Connection getConnection() {
- if (cacheLevel > JMSConstants.CACHE_NONE) {
- return getSharedConnection();
- } else {
- return createConnection();
+ /**
+ * Close the given Connection, hiding exceptions if any which are logged
+ * @param connection the Connection to be closed
+ */
+ private void closeConnection(boolean forced) {
+ if (connection != null &&
+ (cacheLevel < JMSConstants.CACHE_CONNECTION || forced)) {
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("Closing non-shared JMS connection for service : " + serviceName);
+ }
+ connection.close();
+ } catch (JMSException e) {
+ logError("Error closing JMS connection", e);
+ } finally {
+ connection = null;
+ }
+ }
}
- }
- /**
- * Get a Session that could/should be used by this STM - depends on the cache level to reuse
- * @param connection the connection (could be the shared connection) to use to create a Session
- * @return the shared Session if cache level is higher than CACHE_CONNECTION, or a new Session
- * created using the Connection passed, or a new/shared connection
- */
- private Session getSession(Connection connection) {
- if (cacheLevel > JMSConstants.CACHE_CONNECTION) {
- return getSharedSession();
- } else {
- return createSession((connection == null ? getConnection() : connection));
+ /**
+ * Close the given Session, hiding exceptions if any which are logged
+ * @param session the Session to be closed
+ */
+ private void closeSession(boolean forced) {
+ if (session != null &&
+ (cacheLevel < JMSConstants.CACHE_SESSION || forced)) {
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("Closing non-shared JMS session for service : " + serviceName);
+ }
+ session.close();
+ } catch (JMSException e) {
+ logError("Error closing JMS session", e);
+ } finally {
+ session = null;
+ }
+ }
}
- }
- /**
- * Get a MessageConsumer that chould/should be used by this STM - depends on the cache level to
- * reuse
- * @param connection option Connection to be used
- * @param session optional Session to be used
- * @return the shared MessageConsumer if cache level is higher than CACHE_SESSION, or a new
- * MessageConsumer possibly using the Connection and Session passed in
- */
- private MessageConsumer getMessageConsumer(Connection connection, Session session) {
- if (cacheLevel > JMSConstants.CACHE_SESSION) {
- return getSharedConsumer();
- } else {
- return createConsumer((session == null ? getSession(connection) : session));
+ /**
+ * Close the given Consumer, hiding exceptions if any which are logged
+ * @param consumer the Consumer to be closed
+ */
+ private void closeConsumer(boolean forced) {
+ if (consumer != null &&
+ (cacheLevel < JMSConstants.CACHE_CONSUMER || forced)) {
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("Closing non-shared JMS consumer for service : " + serviceName);
+ }
+ consumer.close();
+ } catch (JMSException e) {
+ logError("Error closing JMS consumer", e);
+ } finally {
+ consumer = null;
+ }
+ }
}
- }
- /**
- * Get the shared Connection for this STM
- * @return shared Connection for the STM
- */
- private synchronized Connection getSharedConnection() {
- if (sharedConnection == null) {
- sharedConnection = createConnection();
- if (log.isDebugEnabled()) {
- log.debug("Created shared JMS Connection for service : " + serviceName);
+ /**
+ * Create a new Connection for this STM, using JNDI properties and credentials provided
+ * @return a new Connection for this STM, using JNDI properties and credentials provided
+ */
+ private Connection createConnection() {
+
+ try {
+ conFactory = JMSUtils.lookup(
+ getInitialContext(), ConnectionFactory.class, getConnFactoryJNDIName());
+ log.info("Connected to the JMS connection factory : " + getConnFactoryJNDIName());
+ } catch (NamingException e) {
+ handleException("Error looking up connection factory : " + getConnFactoryJNDIName() +
+ " using JNDI properties : " + jndiProperties, e);
+ }
+
+ Connection connection = null;
+ try {
+ connection = JMSUtils.createConnection(
+ conFactory,
+ jndiProperties.get(Context.SECURITY_PRINCIPAL),
+ jndiProperties.get(Context.SECURITY_CREDENTIALS),
+ isJmsSpec11(), isQueue());
+
+ connection.setExceptionListener(this);
+ connection.start();
+ log.info("JMS Connection for service : " + serviceName + " created and started");
+
+ } catch (JMSException e) {
+ handleException("Error acquiring a JMS connection to : " + getConnFactoryJNDIName() +
+ " using JNDI properties : " + jndiProperties, e);
}
+ return connection;
}
- return sharedConnection;
- }
- /**
- * Get the shared Session for the STM
- * @return shared Session for the STM
- */
- private synchronized Session getSharedSession() {
- if (sharedSession == null) {
- sharedSession = createSession(getSharedConnection());
- if (log.isDebugEnabled()) {
- log.debug("Created shared JMS Session for service : " + serviceName);
+ /**
+ * Create a new Session for this STM
+ * @param connection the Connection to be used
+ * @return a new Session created using the Connection passed in
+ */
+ private Session createSession() {
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("Creating a new JMS Session for service : " + serviceName);
+ }
+ return JMSUtils.createSession(
+ connection, isSessionTransacted(), getSessionAckMode(), isJmsSpec11(), isQueue());
+
+ } catch (JMSException e) {
+ handleException("Error creating JMS session for service : " + serviceName, e);
}
+ return null;
}
- return sharedSession;
- }
- /**
- * Get the shared MessageConsumer for the STM
- * @return shared MessageConsumer for the STM
- */
- private synchronized MessageConsumer getSharedConsumer() {
- if (sharedConsumer == null) {
- sharedConsumer = createConsumer(getSharedSession());
- if (log.isDebugEnabled()) {
- log.debug("Created shared JMS MessageConsumer for service : " + serviceName);
+ /**
+ * Create a new MessageConsumer for this STM
+ * @param session the Session to be used
+ * @return a new MessageConsumer created using the Session passed in
+ */
+ private MessageConsumer createConsumer() {
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("Creating a new JMS MessageConsumer for service : " + serviceName);
+ }
+
+ return JMSUtils.createConsumer(
+ session, getDestination(), isQueue(),
+ (isSubscriptionDurable() && getDurableSubscriberName() == null ?
+ getDurableSubscriberName() : serviceName),
+ getMessageSelector(), isPubSubNoLocal(), isSubscriptionDurable(), isJmsSpec11());
+
+ } catch (JMSException e) {
+ handleException("Error creating JMS consumer for service : " + serviceName,e);
}
+ return null;
}
- return sharedConsumer;
}
+ // -------------- mundane private methods ----------------
/**
* Get the InitialContext for lookup using the JNDI parameters applicable to the service
* @return the InitialContext to be used
@@ -800,7 +774,7 @@
context = getInitialContext();
destination = JMSUtils.lookup(context, Destination.class, getDestinationJNDIName());
if (log.isDebugEnabled()) {
- log.debug("JMS Destionation with JNDI name : " + getDestinationJNDIName() +
+ log.debug("JMS Destination with JNDI name : " + getDestinationJNDIName() +
" found for service " + serviceName);
}
} catch (NamingException e) {
@@ -847,85 +821,8 @@
return sharedUserTransaction;
}
- /**
- * Create a new Connection for this STM, using JNDI properties and credentials provided
- * @return a new Connection for this STM, using JNDI properties and credentials provided
- */
- private Connection createConnection() {
-
- try {
- conFactory = JMSUtils.lookup(
- getInitialContext(), ConnectionFactory.class, getConnFactoryJNDIName());
- log.info("Connected to the JMS connection factory : " + getConnFactoryJNDIName());
- } catch (NamingException e) {
- handleException("Error looking up connection factory : " + getConnFactoryJNDIName() +
- " using JNDI properties : " + jndiProperties, e);
- }
-
- Connection connection = null;
- try {
- connection = JMSUtils.createConnection(
- conFactory,
- jndiProperties.get(Context.SECURITY_PRINCIPAL),
- jndiProperties.get(Context.SECURITY_CREDENTIALS),
- isJmsSpec11(), isQueue());
-
- connection.setExceptionListener(this);
- connection.start();
- log.info("JMS Connection for service : " + serviceName + " created and started");
-
- } catch (JMSException e) {
- handleException("Error acquiring a JMS connection to : " + getConnFactoryJNDIName() +
- " using JNDI properties : " + jndiProperties, e);
- }
- return connection;
- }
-
- /**
- * Create a new Session for this STM
- * @param connection the Connection to be used
- * @return a new Session created using the Connection passed in
- */
- private Session createSession(Connection connection) {
- try {
- if (log.isDebugEnabled()) {
- log.debug("Creating a new JMS Session for service : " + serviceName);
- }
- return JMSUtils.createSession(
- connection, isSessionTransacted(), getSessionAckMode(), isJmsSpec11(), isQueue());
-
- } catch (JMSException e) {
- handleException("Error creating JMS session for service : " + serviceName, e);
- }
- return null;
- }
-
- /**
- * Create a new MessageConsumer for this STM
- * @param session the Session to be used
- * @return a new MessageConsumer created using the Session passed in
- */
- private MessageConsumer createConsumer(Session session) {
- try {
- if (log.isDebugEnabled()) {
- log.debug("Creating a new JMS MessageConsumer for service : " + serviceName);
- }
-
- return JMSUtils.createConsumer(
- session, getDestination(), isQueue(),
- (isSubscriptionDurable() && getDurableSubscriberName() == null ?
- getDurableSubscriberName() : serviceName),
- getMessageSelector(), isPubSubNoLocal(), isSubscriptionDurable(), isJmsSpec11());
-
- } catch (JMSException e) {
- handleException("Error creating JMS consumer for service : " + serviceName,e);
- }
- return null;
- }
-
-
// -------------------- trivial methods ---------------------
- private boolean isActive() {
+ private boolean isSTMActive() {
return state == STATE_STARTED;
}
@@ -1188,6 +1085,10 @@
return activeTaskCount;
}
+ public void setState(int state) {
+ this.state = state;
+ }
+
//--------------------- used for development testing---------------------------
/*public static void main(String[] args) throws Exception {
//org.apache.log4j.BasicConfigurator.configure();
Modified: webservices/commons/trunk/scratch/asankha/transport/modules/testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/MinConcurrencyTest.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/MinConcurrencyTest.java?rev=719648&r1=719647&r2=719648&view=diff
==============================================================================
--- webservices/commons/trunk/scratch/asankha/transport/modules/testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/MinConcurrencyTest.java (original)
+++ webservices/commons/trunk/scratch/asankha/transport/modules/testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/MinConcurrencyTest.java Fri Nov 21 09:34:18 2008
@@ -71,20 +71,24 @@
this.messages = messages;
this.preloadMessages = preloadMessages;
}
+
+ private int concurrencyReached;
+ private final Object concurrencyReachedLock = new Object();
+ private final Object shutdownAwaitLock = new Object();
@Override
protected void runTest() throws Throwable {
int endpointCount = channels.length;
int expectedConcurrency = endpointCount * messages;
- final CountDownLatch shutdownLatch = new CountDownLatch(1);
- final CountDownLatch concurrencyReachedLatch = new CountDownLatch(expectedConcurrency);
-
final MessageReceiver messageReceiver = new MessageReceiver() {
public void receive(MessageContext msgContext) throws AxisFault {
- concurrencyReachedLatch.countDown();
+ synchronized (concurrencyReachedLock) {
+ concurrencyReached++;
+ concurrencyReachedLock.notifyAll();
+ }
try {
- shutdownLatch.await();
+ shutdownAwaitLock.wait();
} catch (InterruptedException ex) {
}
}
@@ -135,14 +139,25 @@
endpointResourceSets[i] = endpointResourceSet;
}
}
-
- if (!concurrencyReachedLatch.await(5, TimeUnit.SECONDS)) {
- fail("Concurrency reached is " + (expectedConcurrency -
- concurrencyReachedLatch.getCount()) + ", but expected " +
- expectedConcurrency);
+
+ long startTime = System.currentTimeMillis();
+ while (concurrencyReached < expectedConcurrency
+ && System.currentTimeMillis() < (startTime + 5000)) {
+ synchronized(concurrencyReachedLock) {
+ concurrencyReachedLock.wait();
+ }
+ }
+
+ synchronized(shutdownAwaitLock) {
+ shutdownAwaitLock.notifyAll();
}
+
+ if (concurrencyReached < expectedConcurrency) {
+ fail("Concurrency reached is " + concurrencyReached + ", but expected " +
+ expectedConcurrency);
+ }
+
} finally {
- shutdownLatch.countDown();
for (int i=0; i<endpointCount; i++) {
if (endpointResourceSets[i] != null) {
endpointResourceSets[i].tearDown();
Modified: webservices/commons/trunk/scratch/asankha/transport/modules/tests/log4j.properties
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/tests/log4j.properties?rev=719648&r1=719647&r2=719648&view=diff
==============================================================================
--- webservices/commons/trunk/scratch/asankha/transport/modules/tests/log4j.properties (original)
+++ webservices/commons/trunk/scratch/asankha/transport/modules/tests/log4j.properties Fri Nov 21 09:34:18 2008
@@ -20,10 +20,12 @@
# log4j configuration file used by unit tests
log4j.rootCategory=DEBUG, CONSOLE
+#log4j.rootCategory=WARN, CONSOLE
log4j.category.org.apache.axis2.transport.jms=TRACE
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.threshold=ERROR
+#log4j.appender.CONSOLE.threshold=TRACE
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%5p [%t] %c{1} %m%n
Modified: webservices/commons/trunk/scratch/asankha/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportTest.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/scratch/asankha/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportTest.java?rev=719648&r1=719647&r2=719648&view=diff
==============================================================================
--- webservices/commons/trunk/scratch/asankha/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportTest.java (original)
+++ webservices/commons/trunk/scratch/asankha/transport/modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportTest.java Fri Nov 21 09:34:18 2008
@@ -53,6 +53,7 @@
suite.addExclude("(&(test=EchoXML)(replyDestType=topic)(endpoint=axis))");
// Example to run a few use cases.. please leave these commented out - asankha
+ //suite.addExclude("(|(test=AsyncXML)(test=MinConcurrency)(destType=topic)(broker=qpid)(destType=topic)(replyDestType=topic)(client=jms)(endpoint=mock)(cfOnSender=true))");
//suite.addExclude("(|(test=EchoXML)(destType=queue)(broker=qpid)(cfOnSender=true)(singleCF=false)(destType=queue)(client=jms)(endpoint=mock))");
//suite.addExclude("(|(test=EchoXML)(test=AsyncXML)(test=AsyncSwA)(test=AsyncTextPlain)(test=AsyncBinary)(test=AsyncSOAPLarge)(broker=qpid))");