You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/03/02 17:37:30 UTC

svn commit: r513835 - in /incubator/qpid/branches/perftesting/qpid/java: broker/src/main/java/org/apache/qpid/server/state/ client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/client/failover/ client/src/main/java/org/apac...

Author: ritchiem
Date: Fri Mar  2 08:37:28 2007
New Revision: 513835

URL: http://svn.apache.org/viewvc?view=rev&rev=513835
Log:
QPID-308
Broker:
AMQStateManager - Added extra logging to keep track of what is going on as the findStateTransitionHandler is recursive.

Client:
AMQConnection - Comment Changes. Added timeouts to connections.
AMQSession - Added timeout on closure
FailoverHandler - Comment changes and adjusted logging
AMQProtocolHandler - Comments changed and added timeouts to the syncwait calls.
AMQProtocolSession - Added timeouts to writeFrame joins.
BlockingMethodFrameListener - Added timeouts to blockFrame waits.
AMQStateManager - Added additional logging
ResetMessageListenerTest - Fixed logging level on a single log line.

Created ManualTests
Added MessageAgeAlert test case supplied by customer.

MessageRequeueTest - Moved QpidClientConnection to its own class
QpidClientConnection - Added based on a class from a customer.

AMQTimeoutException - Added new exception based on timeouts
AMQConstant - Added timeout constant
AMQQueueAlertTest - adjusted values as my dual core would fail occasionally.
BrokerFillMemoryRun - added test to fill the broker's memory.

Added:
    incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/ManualTests/
    incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/ManualTests/BrokerFillMemoryRun.java   (with props)
    incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/ManualTests/MessageAgeAlert.java   (with props)
    incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java   (with props)
    incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java   (with props)
Modified:
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
    incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
    incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?view=diff&rev=513835&r1=513834&r2=513835
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java Fri Mar  2 08:37:28 2007
@@ -35,23 +35,19 @@
 import java.util.concurrent.CopyOnWriteArraySet;
 
 /**
- * The state manager is responsible for managing the state of the protocol session.
- * <p/>
- * For each AMQProtocolHandler there is a separate state manager.
- *
+ * The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler
+ * there is a separate state manager.
  */
 public class AMQStateManager implements AMQMethodListener
 {
     private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
 
-    /**
-     * The current state
-     */
+    /** The current state */
     private AMQState _currentState;
 
     /**
-     * Maps from an AMQState instance to a Map from Class to StateTransitionHandler.
-     * The class must be a subclass of AMQFrame.
+     * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of
+     * AMQFrame.
      */
     private final Map<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>> _state2HandlersMap =
             new HashMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>>();
@@ -159,9 +155,9 @@
     }
 
     public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt,
-                           AMQProtocolSession protocolSession,
-                           QueueRegistry queueRegistry,
-                           ExchangeRegistry exchangeRegistry) throws AMQException
+                                                            AMQProtocolSession protocolSession,
+                                                            QueueRegistry queueRegistry,
+                                                            ExchangeRegistry exchangeRegistry) throws AMQException
     {
         StateAwareMethodListener<B> handler = findStateTransitionHandler(_currentState, evt.getMethod());
         if (handler != null)
@@ -173,20 +169,19 @@
     }
 
     protected <B extends AMQMethodBody> StateAwareMethodListener<B> findStateTransitionHandler(AMQState currentState,
-                                                                                             B frame)
+                                                                                               B frame)
             throws IllegalStateTransitionException
     {
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("Looking for state transition handler for frame " + frame.getClass());
+            _logger.debug("Looking for state[" + currentState + "] transition handler for frame " + frame.getClass());
         }
         final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>
                 classToHandlerMap = _state2HandlersMap.get(currentState);
 
         if (classToHandlerMap == null)
         {
-            // if no specialised per state handler is registered look for a
-            // handler registered for "all" states
+            _logger.debug("No specialised per state handler is registered look for a handler registered for 'all' states");
             return findStateTransitionHandler(null, frame);
         }
         final StateAwareMethodListener<B> handler = (StateAwareMethodListener<B>) classToHandlerMap.get(frame.getClass());
@@ -199,8 +194,7 @@
             }
             else
             {
-                // if no specialised per state handler is registered look for a
-                // handler registered for "all" states
+                _logger.debug("No specialised per state handler is registered look for a handler registered for 'all' states");
                 return findStateTransitionHandler(null, frame);
             }
         }

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=513835&r1=513834&r2=513835
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Fri Mar  2 08:37:28 2007
@@ -64,23 +64,23 @@
 {
     private static final Logger _logger = Logger.getLogger(AMQConnection.class);
 
+    private static final long MAXIMUM_WAIT_TIME = 30000l;
+
     private AtomicInteger _idFactory = new AtomicInteger(0);
 
     /**
-     * This is the "root" mutex that must be held when doing anything that could be impacted by failover.
-     * This must be held by any child objects of this connection such as the session, producers and consumers.
+     * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be
+     * held by any child objects of this connection such as the session, producers and consumers.
      */
     private final Object _failoverMutex = new Object();
 
     /**
-     * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels
-     * per session and we must prevent the client from opening too many. Zero means unlimited.
+     * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session
+     * and we must prevent the client from opening too many. Zero means unlimited.
      */
     private long _maximumChannelCount;
 
-    /**
-     * The maximum size of frame supported by the server
-     */
+    /** The maximum size of frame supported by the server */
     private long _maximumFrameSize;
 
     /**
@@ -90,26 +90,18 @@
      */
     private AMQProtocolHandler _protocolHandler;
 
-    /**
-     * Maps from session id (Integer) to AMQSession instance
-     */
+    /** Maps from session id (Integer) to AMQSession instance */
     private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap    
 
     private String _clientName;
 
-    /**
-     * The user name to use for authentication
-     */
+    /** The user name to use for authentication */
     private String _username;
 
-    /**
-     * The password to use for authentication
-     */
+    /** The password to use for authentication */
     private String _password;
 
-    /**
-     * The virtual path to connect to on the AMQ server
-     */
+    /** The virtual path to connect to on the AMQ server */
     private String _virtualHost;
 
     private ExceptionListener _exceptionListener;
@@ -119,14 +111,12 @@
     private ConnectionURL _connectionURL;
 
     /**
-     * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for
-     * message publication.
+     * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message
+     * publication.
      */
     private boolean _started;
 
-    /**
-     * Policy dictating how to failover
-     */
+    /** Policy dictating how to failover */
     private FailoverPolicy _failoverPolicy;
 
     /*
@@ -353,8 +343,8 @@
     /**
      * Get the details of the currently active broker
      *
-     * @return null if no broker is active (i.e. no successful connection has been made, or
-     *         the BrokerDetail instance otherwise
+     * @return null if no broker is active (i.e. no successful connection has been made, or the BrokerDetail instance
+     *         otherwise
      */
     public BrokerDetails getActiveBrokerDetails()
     {
@@ -507,12 +497,14 @@
     }
 
     /**
-     * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions
-     * where specified in the JMS spec
+     * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions where specified in
+     * the JMS spec
      *
      * @param transacted
      * @param acknowledgeMode
+     *
      * @return QueueSession
+     *
      * @throws JMSException
      */
     public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException
@@ -521,12 +513,14 @@
     }
 
     /**
-     * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions
-     * where specified in the JMS spec
+     * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions where specified in
+     * the JMS spec
      *
      * @param transacted
      * @param acknowledgeMode
+     *
      * @return TopicSession
+     *
      * @throws JMSException
      */
     public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException
@@ -623,14 +617,19 @@
 
     public void close() throws JMSException
     {
+        close(-1);
+    }
+
+    public void close(long timeout) throws JMSException
+    {
         synchronized (getFailoverMutex())
         {
             if (!_closed.getAndSet(true))
             {
                 try
                 {
-                    closeAllSessions(null);
-                    _protocolHandler.closeConnection();
+                    closeAllSessions(null, timeout);
+                    _protocolHandler.closeConnection(timeout);
                 }
                 catch (AMQException e)
                 {
@@ -641,11 +640,9 @@
     }
 
     /**
-     * Marks all sessions and their children as closed without sending any protocol messages. Useful when
-     * you need to mark objects "visible" in userland as closed after failover or other significant event that
-     * impacts the connection.
-     * <p/>
-     * The caller must hold the failover mutex before calling this method.
+     * Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to
+     * mark objects "visible" in userland as closed after failover or other significant event that impacts the
+     * connection. <p/> The caller must hold the failover mutex before calling this method.
      */
     private void markAllSessionsClosed()
     {
@@ -663,11 +660,10 @@
     /**
      * Close all the sessions, either due to normal connection closure or due to an error occurring.
      *
-     * @param cause if not null, the error that is causing this shutdown
-     *              <p/>
-     *              The caller must hold the failover mutex before calling this method.
+     * @param cause if not null, the error that is causing this shutdown <p/> The caller must hold the failover mutex
+     *              before calling this method.
      */
-    private void closeAllSessions(Throwable cause) throws JMSException
+    private void closeAllSessions(Throwable cause, long timeout) throws JMSException
     {
         final LinkedList sessionCopy = new LinkedList(_sessions.values());
         final Iterator it = sessionCopy.iterator();
@@ -683,7 +679,7 @@
             {
                 try
                 {
-                    session.close();
+                    session.close(timeout);
                 }
                 catch (JMSException e)
                 {
@@ -788,7 +784,7 @@
     {
         return _protocolHandler;
     }
-    
+
     public boolean started()
     {
         return _started;
@@ -814,6 +810,7 @@
      * Fire the preFailover event to the registered connection listener (if any)
      *
      * @param redirect true if this is the result of a redirect request rather than a connection error
+     *
      * @return true if no listener or listener does not veto change
      */
     public boolean firePreFailover(boolean redirect)
@@ -827,10 +824,11 @@
     }
 
     /**
-     * Fire the preResubscribe event to the registered connection listener (if any). If the listener
-     * vetoes resubscription then all the sessions are closed.
+     * Fire the preResubscribe event to the registered connection listener (if any). If the listener vetoes
+     * resubscription then all the sessions are closed.
      *
      * @return true if no listener or listener does not veto resubscription.
+     *
      * @throws JMSException
      */
     public boolean firePreResubscribe() throws JMSException
@@ -850,9 +848,7 @@
         }
     }
 
-    /**
-     * Fires a failover complete event to the registered connection listener (if any).
-     */
+    /** Fires a failover complete event to the registered connection listener (if any). */
     public void fireFailoverComplete()
     {
         if (_connectionListener != null)
@@ -862,8 +858,8 @@
     }
 
     /**
-     * In order to protect the consistency of the connection and its child sessions, consumers and producers,
-     * the "failover mutex" must be held when doing any operations that could be corrupted during failover.
+     * In order to protect the consistency of the connection and its child sessions, consumers and producers, the
+     * "failover mutex" must be held when doing any operations that could be corrupted during failover.
      *
      * @return a mutex. Guaranteed never to change for the lifetime of this connection even if failover occurs.
      */
@@ -873,8 +869,8 @@
     }
 
     /**
-     * If failover is taking place this will block until it has completed. If failover
-     * is not taking place it will return immediately.
+     * If failover is taking place this will block until it has completed. If failover is not taking place it will
+     * return immediately.
      *
      * @throws InterruptedException
      */
@@ -884,10 +880,9 @@
     }
 
     /**
-     * Invoked by the AMQProtocolSession when a protocol session exception has occurred.
-     * This method sends the exception to a JMS exception listener, if configured, and
-     * propagates the exception to sessions, which in turn will propagate to consumers.
-     * This allows synchronous consumers to have exceptions thrown to them.
+     * Invoked by the AMQProtocolSession when a protocol session exception has occurred. This method sends the exception
+     * to a JMS exception listener, if configured, and propagates the exception to sessions, which in turn will
+     * propagate to consumers. This allows synchronous consumers to have exceptions thrown to them.
      *
      * @param cause the exception
      */
@@ -937,7 +932,7 @@
             {
                 _logger.info("Closing AMQConnection due to :" + cause.getMessage());
                 _closed.set(true);
-                closeAllSessions(cause); // FIXME: when doing this end up with RejectedExecutionException from executor.
+                closeAllSessions(cause, MAXIMUM_WAIT_TIME); // FIXME: when doing this end up with RejectedExecutionException from executor.
             }
             catch (JMSException e)
             {

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=513835&r1=513834&r2=513835
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Mar  2 08:37:28 2007
@@ -338,7 +338,7 @@
                 {
                     _queue.add(msg);
                 }
-                
+
                 if (stopped)
                 {
                     _dispatcher.setConnectionStopped(stopped);
@@ -648,6 +648,11 @@
 
     public void close() throws JMSException
     {
+        close(-1);
+    }
+
+    public void close(long timeout) throws JMSException
+    {
         // We must close down all producers and consumers in an orderly fashion. This is the only method
         // that can be called from a different thread of control from the one controlling the session
         synchronized (_connection.getFailoverMutex())
@@ -657,7 +662,7 @@
             {
 
                 // we pass null since this is not an error case
-                closeProducersAndConsumers(null);                
+                closeProducersAndConsumers(null);
 
                 try
                 {
@@ -671,7 +676,8 @@
                                                                            0,    // methodId
                                                                            AMQConstant.REPLY_SUCCESS.getCode(),    // replyCode
                                                                            "JMS client closing channel");    // replyText
-                    _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class);
+
+                    _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout);
                     // When control resumes at this point, a reply will have been received that
                     // indicates the broker has closed the channel successfully
 
@@ -696,8 +702,10 @@
      *
      * @param amqe the exception, may be null to indicate no error has occurred
      */
-    private void closeProducersAndConsumers(AMQException amqe)
+    private void closeProducersAndConsumers(AMQException amqe) throws JMSException
     {
+        JMSException jmse = null;
+
         try
         {
             closeProducers();
@@ -705,6 +713,7 @@
         catch (JMSException e)
         {
             _logger.error("Error closing session: " + e, e);
+            jmse = e;
         }
         try
         {
@@ -713,6 +722,15 @@
         catch (JMSException e)
         {
             _logger.error("Error closing session: " + e, e);
+            if (jmse == null)
+            {
+                jmse = e;
+            }
+        }
+
+        if (jmse != null)
+        {
+            throw jmse;
         }
     }
 
@@ -727,7 +745,7 @@
      *
      * @param e the exception that caused this session to be closed. Null causes the
      */
-    public void closed(Throwable e)
+    public void closed(Throwable e) throws JMSException
     {
         synchronized (_connection.getFailoverMutex())
         {

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java?view=diff&rev=513835&r1=513834&r2=513835
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java Fri Mar  2 08:37:28 2007
@@ -31,13 +31,11 @@
 import java.util.concurrent.CountDownLatch;
 
 /**
- * When failover is required, we need a separate thread to handle the establishment of the new connection and
- * the transfer of subscriptions.
- * </p>
- * The reason this needs to be a separate thread is because you cannot do this work inside the MINA IO processor
- * thread. One significant task is the connection setup which involves a protocol exchange until a particular state
- * is achieved. However if you do this in the MINA thread, you have to block until the state is achieved which means
- * the IO processor is not able to do anything at all.
+ * When failover is required, we need a separate thread to handle the establishment of the new connection and the
+ * transfer of subscriptions. </p> The reason this needs to be a separate thread is because you cannot do this work
+ * inside the MINA IO processor thread. One significant task is the connection setup which involves a protocol exchange
+ * until a particular state is achieved. However if you do this in the MINA thread, you have to block until the state is
+ * achieved which means the IO processor is not able to do anything at all.
  */
 public class FailoverHandler implements Runnable
 {
@@ -46,14 +44,10 @@
     private final IoSession _session;
     private AMQProtocolHandler _amqProtocolHandler;
 
-    /**
-     * Used where forcing the failover host
-     */
+    /** Used where forcing the failover host */
     private String _host;
 
-    /**
-     * Used where forcing the failover port
-     */
+    /** Used where forcing the failover port */
     private int _port;
 
     public FailoverHandler(AMQProtocolHandler amqProtocolHandler, IoSession session)
@@ -82,8 +76,6 @@
         // client code which runs in a separate thread.
         synchronized (_amqProtocolHandler.getConnection().getFailoverMutex())
         {
-            _logger.info("Starting failover process");
-
             // We switch in a new state manager temporarily so that the interaction to get to the "connection open"
             // state works, without us having to terminate any existing "state waiters". We could theoretically
             // have a state waiter waiting until the connection is closed for some reason. Or in future we may have
@@ -92,6 +84,8 @@
             _amqProtocolHandler.setStateManager(new AMQStateManager());
             if (!_amqProtocolHandler.getConnection().firePreFailover(_host != null))
             {
+                _logger.info("Starting failover process");
+
                 _amqProtocolHandler.setStateManager(existingStateManager);
                 if (_host != null)
                 {
@@ -105,6 +99,9 @@
                 _amqProtocolHandler.setFailoverLatch(null);
                 return;
             }
+
+            _logger.info("Starting failover process");
+
             boolean failoverSucceeded;
             // when host is non null we have a specified failover host otherwise we all the client to cycle through
             // all specified hosts
@@ -123,7 +120,7 @@
                 _amqProtocolHandler.setStateManager(existingStateManager);
                 _amqProtocolHandler.getConnection().exceptionReceived(
                         new AMQDisconnectedException("Server closed connection and no failover " +
-                                "was successful"));
+                                                     "was successful"));
             }
             else
             {

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=513835&r1=513834&r2=513835
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Fri Mar  2 08:37:28 2007
@@ -29,6 +29,7 @@
 import org.apache.qpid.AMQConnectionClosedException;
 import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQTimeoutException;
 import org.apache.qpid.pool.ReadWriteThreadModel;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQSession;
@@ -58,21 +59,18 @@
     private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class);
 
     /**
-     * The connection that this protocol handler is associated with. There is a 1-1
-     * mapping between connection instances and protocol handler instances.
+     * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection instances
+     * and protocol handler instances.
      */
     private AMQConnection _connection;
 
     /**
-     * Used only when determining whether to add the SSL filter or not. This should be made more
-     * generic in future since we will potentially have many transport layer options
+     * Used only when determining whether to add the SSL filter or not. This should be made more generic in future since
+     * we will potentially have many transport layer options
      */
     private boolean _useSSL;
 
-    /**
-     * Our wrapper for a protocol session that provides access to session values
-     * in a typesafe manner.
-     */
+    /** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */
     private volatile AMQProtocolSession _protocolSession;
 
     private AMQStateManager _stateManager = new AMQStateManager();
@@ -86,6 +84,8 @@
      */
     private FailoverHandler _failoverHandler;
 
+    private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
+
     /**
      * This flag is used to track whether failover is being attempted. It is used to prevent the application constantly
      * attempting failover where it is failing.
@@ -175,6 +175,7 @@
      * sessionClosed() depending on whether we were trying to send data at the time of failure.
      *
      * @param session
+     *
      * @throws Exception
      */
     public void sessionClosed(IoSession session) throws Exception
@@ -229,9 +230,7 @@
         _logger.info("Protocol Session [" + this + "] closed");
     }
 
-    /**
-     * See {@link FailoverHandler} to see rationale for separate thread.
-     */
+    /** See {@link FailoverHandler} to see rationale for separate thread. */
     private void startFailoverThread()
     {
         Thread failoverThread = new Thread(_failoverHandler);
@@ -294,10 +293,9 @@
     }
 
     /**
-     * There are two cases where we have other threads potentially blocking for events to be handled by this
-     * class. These are for the state manager (waiting for a state change) or a frame listener (waiting for a
-     * particular type of frame to arrive). When an error occurs we need to notify these waiters so that they can
-     * react appropriately.
+     * There are two cases where we have other threads potentially blocking for events to be handled by this class.
+     * These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type
+     * of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately.
      *
      * @param e the exception to propagate
      */
@@ -406,8 +404,8 @@
     }
 
     /**
-     * Convenience method that writes a frame to the protocol session. Equivalent
-     * to calling getProtocolSession().write().
+     * Convenience method that writes a frame to the protocol session. Equivalent to calling
+     * getProtocolSession().write().
      *
      * @param frame the frame to write
      */
@@ -422,9 +420,8 @@
     }
 
     /**
-     * Convenience method that writes a frame to the protocol session and waits for
-     * a particular response. Equivalent to calling getProtocolSession().write() then
-     * waiting for the response.
+     * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
+     * calling getProtocolSession().write() then waiting for the response.
      *
      * @param frame
      * @param listener the blocking listener. Note the calling thread will block.
@@ -433,12 +430,27 @@
                                                             BlockingMethodFrameListener listener)
             throws AMQException
     {
+        return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT);
+    }
+
+    /**
+     * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
+     * calling getProtocolSession().write() then waiting for the response.
+     *
+     * @param frame
+     * @param listener the blocking listener. Note the calling thread will block.
+     */
+    private AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
+                                                            BlockingMethodFrameListener listener,
+                                                            long timeout)
+            throws AMQException
+    {
         try
         {
             _frameListeners.add(listener);
             _protocolSession.writeFrame(frame);
 
-            AMQMethodEvent e = listener.blockForFrame();
+            AMQMethodEvent e = listener.blockForFrame(timeout);
             return e;
             // When control resumes before this line, a reply will have been received
             // that matches the criteria defined in the blocking listener
@@ -451,19 +463,23 @@
 
     }
 
-    /**
-     * More convenient method to write a frame and wait for it's response.
-     */
+    /** More convenient method to write a frame and wait for it's response. */
     public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException
     {
+        return syncWrite(frame, responseClass, DEFAULT_SYNC_TIMEOUT);
+    }
+
+    /** More convenient method to write a frame and wait for it's response. */
+    public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException
+    {
         return writeCommandFrameAndWaitForReply(frame,
-                                                new SpecificMethodFrameListener(frame.channel, responseClass));
+                                                new SpecificMethodFrameListener(frame.channel, responseClass),
+                                                timeout);
     }
 
     /**
-     * Convenience method to register an AMQSession with the protocol handler. Registering
-     * a session with the protocol handler will ensure that messages are delivered to the
-     * consumer(s) on that session.
+     * Convenience method to register an AMQSession with the protocol handler. Registering a session with the protocol
+     * handler will ensure that messages are delivered to the consumer(s) on that session.
      *
      * @param channelId the channel id of the session
      * @param session   the session instance.
@@ -490,33 +506,40 @@
 
     public void closeConnection() throws AMQException
     {
+        closeConnection(-1);
+    }
+
+    public void closeConnection(long timeout) throws AMQException
+    {
         _stateManager.changeState(AMQState.CONNECTION_CLOSING);
 
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
         final AMQFrame frame = ConnectionCloseBody.createAMQFrame(0,
-            (byte)8, (byte)0,	// AMQP version (major, minor)
-            0,	// classId
-            0,	// methodId
-            AMQConstant.REPLY_SUCCESS.getCode(),	// replyCode
-            "JMS client is closing the connection.");	// replyText
-        syncWrite(frame, ConnectionCloseOkBody.class);
-
-        _protocolSession.closeProtocolSession();
+                                                                  (byte) 8, (byte) 0,    // AMQP version (major, minor)
+                                                                  0,    // classId
+                                                                  0,    // methodId
+                                                                  AMQConstant.REPLY_SUCCESS.getCode(),    // replyCode
+                                                                  "JMS client is closing the connection.");    // replyText
+        try
+        {
+            syncWrite(frame, ConnectionCloseOkBody.class, timeout);
+            _protocolSession.closeProtocolSession();
+        }
+        catch (AMQTimeoutException e)
+        {
+            _protocolSession.closeProtocolSession(false);
+        }
     }
 
-    /**
-     * @return the number of bytes read from this protocol session
-     */
+    /** @return the number of bytes read from this protocol session */
     public long getReadBytes()
     {
         return _protocolSession.getIoSession().getReadBytes();
     }
 
-    /**
-     * @return the number of bytes written to this protocol session
-     */
+    /** @return the number of bytes written to this protocol session */
     public long getWrittenBytes()
     {
         return _protocolSession.getIoSession().getWrittenBytes();

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=513835&r1=513834&r2=513835
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Fri Mar  2 08:37:28 2007
@@ -46,13 +46,12 @@
 /**
  * Wrapper for protocol session that provides type-safe access to session attributes.
  *
- * The underlying protocol session is still available but clients should not
- * use it to obtain session attributes.
+ * The underlying protocol session is still available but clients should not use it to obtain session attributes.
  */
 public class AMQProtocolSession implements ProtocolVersionList
 {
 
-    protected static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2;
+    protected static final int WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 30;
 
     protected static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
 
@@ -69,33 +68,29 @@
     protected WriteFuture _lastWriteFuture;
 
     /**
-     * The handler from which this session was created and which is used to handle protocol events.
-     * We send failover events to the handler.
+     * The handler from which this session was created and which is used to handle protocol events. We send failover
+     * events to the handler.
      */
     protected final AMQProtocolHandler _protocolHandler;
 
-    /**
-     * Maps from the channel id to the AMQSession that it represents.
-     */
+    /** Maps from the channel id to the AMQSession that it represents. */
     protected ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap();
 
     protected ConcurrentMap _closingChannels = new ConcurrentHashMap();
 
     /**
-     * Maps from a channel id to an unprocessed message. This is used to tie together the
-     * JmsDeliverBody (which arrives first) with the subsequent content header and content bodies.
+     * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives
+     * first) with the subsequent content header and content bodies.
      */
     protected ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap();
 
-    /**
-     * Counter to ensure unique queue names
-     */
+    /** Counter to ensure unique queue names */
     protected int _queueId = 1;
     protected final Object _queueIdLock = new Object();
 
     /**
-     * No-arg constructor for use by test subclass - has to initialise final vars
-     * NOT intended for use other then for test
+     * No-arg constructor for use by test subclass - has to initialise final vars NOT intended for use other then for
+     * test
      */
     public AMQProtocolSession()
     {
@@ -167,8 +162,8 @@
 
     /**
      * Store the SASL client currently being used for the authentication handshake
-     * @param client if non-null, stores this in the session. if null clears any existing client
-     * being stored
+     *
+     * @param client if non-null, stores this in the session. if null clears any existing client being stored
      */
     public void setSaslClient(SaslClient client)
     {
@@ -197,9 +192,11 @@
     }
 
     /**
-     * Callback invoked from the BasicDeliverMethodHandler when a message has been received.
-     * This is invoked on the MINA dispatcher thread.
+     * Callback invoked from the BasicDeliverMethodHandler when a message has been received. This is invoked on the MINA
+     * dispatcher thread.
+     *
      * @param message
+     *
      * @throws AMQException if this was not expected
      */
     public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
@@ -254,10 +251,10 @@
     }
 
     /**
-     * Deliver a message to the appropriate session, removing the unprocessed message
-     * from our map
+     * Deliver a message to the appropriate session, removing the unprocessed message from our map
+     *
      * @param channelId the channel id the message should be delivered to
-     * @param msg the message
+     * @param msg       the message
      */
     private void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg)
     {
@@ -267,8 +264,8 @@
     }
 
     /**
-     * Convenience method that writes a frame to the protocol session. Equivalent
-     * to calling getProtocolSession().write().
+     * Convenience method that writes a frame to the protocol session. Equivalent to calling
+     * getProtocolSession().write().
      *
      * @param frame the frame to write
      */
@@ -282,7 +279,7 @@
         WriteFuture f = _minaProtocolSession.write(frame);
         if (wait)
         {
-            f.join();
+            f.join(WRITE_FUTURE_JOIN_TIMEOUT);
         }
         else
         {
@@ -316,6 +313,7 @@
 
     /**
      * Starts the process of closing a session
+     *
      * @param session the AMQSession being closed
      */
     public void closeSession(AMQSession session)
@@ -333,23 +331,30 @@
     }
 
     /**
-     * Called from the ChannelClose handler when a channel close frame is received.
-     * This method decides whether this is a response or an initiation. The latter
-     * case causes the AMQSession to be closed and an exception to be thrown if
+     * Called from the ChannelClose handler when a channel close frame is received. This method decides whether this is
+     * a response or an initiation. The latter case causes the AMQSession to be closed and an exception to be thrown if
      * appropriate.
+     *
      * @param channelId the id of the channel (session)
-     * @return true if the client must respond to the server, i.e. if the server
-     * initiated the channel close, false if the channel close is just the server
-     * responding to the client's earlier request to close the channel.
+     *
+     * @return true if the client must respond to the server, i.e. if the server initiated the channel close, false if
+     *         the channel close is just the server responding to the client's earlier request to close the channel.
      */
-    public boolean channelClosed(int channelId, int code, String text)
+    public boolean channelClosed(int channelId, int code, String text) throws AMQException
     {
         final Integer chId = channelId;
         // if this is not a response to an earlier request to close the channel
         if (_closingChannels.remove(chId) == null)
         {
             final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId);
-            session.closed(new AMQException(_logger, code, text));
+            try
+            {
+                session.closed(new AMQException(_logger, code, text));
+            }
+            catch (JMSException e)
+            {
+                throw new AMQException("JMSException received while closing session", e);
+            }
             return true;
         }
         else
@@ -365,15 +370,20 @@
 
     public void closeProtocolSession()
     {
+        closeProtocolSession(true);
+    }
+
+    public void closeProtocolSession(boolean waitLast)
+    {
         _logger.debug("Waiting for last write to join.");
-        if (_lastWriteFuture != null)
+        if (waitLast && _lastWriteFuture != null)
         {
-            _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
+            _lastWriteFuture.join(WRITE_FUTURE_JOIN_TIMEOUT);
         }
 
         _logger.debug("Closing protocol session");
         final CloseFuture future = _minaProtocolSession.close();
-        future.join();
+        future.join(WRITE_FUTURE_JOIN_TIMEOUT);
     }
 
     public void failover(String host, int port)
@@ -384,19 +394,16 @@
     protected String generateQueueName()
     {
         int id;
-        synchronized(_queueIdLock)
+        synchronized (_queueIdLock)
         {
             id = _queueId++;
         }
         //get rid of / and : and ; from address for spec conformance
-        String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(),"/;:","");
+        String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(), "/;:", "");
         return "tmp_" + localAddress + "_" + id;
     }
 
-    /**
-     *
-     * @param delay delay in seconds (not ms)
-     */
+    /** @param delay delay in seconds (not ms) */
     void initHeartbeats(int delay)
     {
         if (delay > 0)

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java?view=diff&rev=513835&r1=513834&r2=513835
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java Fri Mar  2 08:37:28 2007
@@ -21,6 +21,7 @@
 package org.apache.qpid.client.protocol;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQTimeoutException;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.framing.AMQMethodBody;
 
@@ -33,8 +34,8 @@
     private final Object _lock = new Object();
 
     /**
-     * This is set if there is an exception thrown from processCommandFrame and the
-     * exception is rethrown to the caller of blockForFrame()
+     * This is set if there is an exception thrown from processCommandFrame and the exception is rethrown to the caller
+     * of blockForFrame()
      */
     private volatile Exception _error;
 
@@ -48,11 +49,13 @@
     }
 
     /**
-     * This method is called by the MINA dispatching thread. Note that it could
-     * be called before blockForFrame() has been called.
+     * This method is called by the MINA dispatching thread. Note that it could be called before blockForFrame() has
+     * been called.
      *
      * @param evt the frame event
+     *
      * @return true if the listener has dealt with this frame
+     *
      * @throws AMQException
      */
     public boolean methodReceived(AMQMethodEvent evt) throws AMQException
@@ -85,10 +88,8 @@
         }
     }
 
-    /**
-     * This method is called by the thread that wants to wait for a frame.
-     */
-    public AMQMethodEvent blockForFrame() throws AMQException
+    /** This method is called by the thread that wants to wait for a frame. */
+    public AMQMethodEvent blockForFrame(long timeout) throws AMQException
     {
         synchronized (_lock)
         {
@@ -96,7 +97,20 @@
             {
                 try
                 {
-                    _lock.wait();
+                     if (timeout == -1)
+                    {
+                        _lock.wait();
+                    }
+                    else
+                    {
+
+                        _lock.wait(timeout);
+                        if (!_ready)
+                        {
+                            _error = new AMQTimeoutException("Server did not respond in a timely fashion");
+                            _ready = true;
+                        }
+                    }
                 }
                 catch (InterruptedException e)
                 {
@@ -125,8 +139,8 @@
     }
 
     /**
-     * This is a callback, called by the MINA dispatcher thread only. It is also called from within this
-     * class to avoid code repetition but again is only called by the MINA dispatcher thread.
+     * This is a callback, called by the MINA dispatcher thread only. It is also called from within this class to avoid
+     * code repetition but again is only called by the MINA dispatcher thread.
      *
      * @param e
      */

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?view=diff&rev=513835&r1=513834&r2=513835
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Fri Mar  2 08:37:28 2007
@@ -33,22 +33,19 @@
 import java.util.concurrent.CopyOnWriteArraySet;
 
 /**
- * The state manager is responsible for managing the state of the protocol session.
- * <p/>
- * For each AMQProtocolHandler there is a separate state manager.
+ * The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler
+ * there is a separate state manager.
  */
 public class AMQStateManager implements AMQMethodListener
 {
     private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
 
-    /**
-     * The current state
-     */
+    /** The current state */
     private AMQState _currentState;
 
     /**
-     * Maps from an AMQState instance to a Map from Class to StateTransitionHandler.
-     * The class must be a subclass of AMQFrame.
+     * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of
+     * AMQFrame.
      */
     private final Map _state2HandlersMap = new HashMap();
 
@@ -159,14 +156,13 @@
         final Class clazz = frame.getClass();
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("Looking for state transition handler for frame " + clazz);
+            _logger.debug("Looking for state[" + currentState + "] transition handler for frame " + clazz);
         }
         final Map classToHandlerMap = (Map) _state2HandlersMap.get(currentState);
 
         if (classToHandlerMap == null)
         {
-            // if no specialised per state handler is registered look for a
-            // handler registered for "all" states
+            _logger.debug("no specialised per state handler is registered look for a handler registered for 'all' states");
             return findStateTransitionHandler(null, frame);
         }
         final StateAwareMethodListener handler = (StateAwareMethodListener) classToHandlerMap.get(clazz);
@@ -174,13 +170,12 @@
         {
             if (currentState == null)
             {
-                _logger.debug("No state transition handler defined for receiving frame " + frame);
+                _logger.debug("No state[" + currentState + "] transition handler defined for receiving frame " + frame);
                 return null;
             }
             else
             {
-                // if no specialised per state handler is registered look for a
-                // handler registered for "all" states
+                _logger.debug("No specialised per state handler is registered look for a handler registered for 'all' states");
                 return findStateTransitionHandler(null, frame);
             }
         }
@@ -214,7 +209,7 @@
             }
             if (_currentState != s)
             {
-                _logger.warn("State not achieved within permitted time.  Current state " + _currentState + ", desired state: " + s);                
+                _logger.warn("State not achieved within permitted time.  Current state " + _currentState + ", desired state: " + s);
                 throw new AMQException("State not achieved within permitted time.  Current state " + _currentState + ", desired state: " + s);
             }
         }

Added: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/ManualTests/BrokerFillMemoryRun.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/ManualTests/BrokerFillMemoryRun.java?view=auto&rev=513835
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/ManualTests/BrokerFillMemoryRun.java (added)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/ManualTests/BrokerFillMemoryRun.java Fri Mar  2 08:37:28 2007
@@ -0,0 +1,102 @@
+package org.apache.qpid.ManualTests;
+
+import junit.framework.TestCase;
+import org.apache.qpid.testutil.QpidClientConnection;
+
+import javax.jms.JMSException;
+
+/** NF101: heap exhaustion behaviour
+ *  Provided by customer 
+ */
+public class BrokerFillMemoryRun extends TestCase
+{
+    protected QpidClientConnection conn;
+    protected final String vhost = "/test";
+    protected final String queue = "direct://amq.direct//queue";
+
+    protected String hundredK;
+    protected String megabyte;
+
+    protected final String BROKER = "tcp://localhost:5672";
+
+
+    protected void log(String msg)
+    {
+        System.out.println(msg);
+    }
+
+    protected String generatePayloadOfSize(Integer numBytes)
+    {
+        return new String(new byte[numBytes]);
+    }
+
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        conn = new QpidClientConnection(BROKER);
+        conn.setVirtualHost(vhost);
+
+        conn.connect();
+        // clear queue
+        log("setup: clearing test queue");
+        conn.consume(queue, 2000);
+
+        hundredK = generatePayloadOfSize(1024 * 100);
+        megabyte = generatePayloadOfSize(1024 * 1024);
+    }
+
+    @Override
+    protected void tearDown() throws Exception
+    {
+        super.tearDown();
+        try
+        {
+            conn.disconnect();
+        }
+        catch (JMSException d)
+        {
+            log("disconnectJMSE:" + d.getMessage());
+        }
+    }
+
+
+    /** PUT at maximum rate (although we commit after each PUT) until failure */
+    public void testUntilFailure() throws Exception
+    {
+        int copies = 0;
+        int total = 0;
+        String payload = hundredK;
+        int size = payload.getBytes().length + String.valueOf("0").getBytes().length;
+        while (true)
+        {
+            conn.put(queue, payload, 1);
+            copies++;
+            total += size;
+            log("put copy " + copies + " OK for total bytes: " + total);
+        }
+    }
+
+    /** PUT at lower rate (5 per second) until failure */
+    public void testUntilFailureWithDelays() throws Exception
+    {
+        try
+        {
+            int copies = 0;
+            int total = 0;
+            String payload = hundredK;
+            int size = payload.getBytes().length + String.valueOf("0").getBytes().length;
+            while (true)
+            {
+                conn.put(queue, payload, 1);
+                copies++;
+                total += size;
+                log("put copy " + copies + " OK for total bytes: " + total);
+                Thread.sleep(200);
+            }
+        }
+        catch (JMSException e)
+        {
+            log("putJMSE:" + e.getMessage());
+        }
+    }
+}

Propchange: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/ManualTests/BrokerFillMemoryRun.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/ManualTests/BrokerFillMemoryRun.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/ManualTests/MessageAgeAlert.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/ManualTests/MessageAgeAlert.java?view=auto&rev=513835
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/ManualTests/MessageAgeAlert.java (added)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/ManualTests/MessageAgeAlert.java Fri Mar  2 08:37:28 2007
@@ -0,0 +1,61 @@
+package org.apache.qpid.ManualTests;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.testutil.QpidClientConnection;
+import org.apache.qpid.client.transport.TransportConnection;
+
+/** FT401: alert on message age
+ *  Provided by customer
+ */
+public class MessageAgeAlert extends TestCase
+{
+    protected QpidClientConnection conn;
+    protected final String queue = "direct://amq.direct//queue";
+    protected final String vhost = "/test";
+
+    protected String payload = "xyzzy";
+
+    protected Integer agePeriod = 30000;
+
+    protected final String BROKER = "localhost";
+
+    protected void log(String msg)
+    {
+        System.out.println(msg);
+    }
+
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+
+//        TransportConnection.createVMBroker(1);
+
+        conn = new QpidClientConnection(BROKER);
+        conn.setVirtualHost(vhost);
+
+        conn.connect();
+    }
+
+    @Override
+    protected void tearDown() throws Exception
+    {
+        super.tearDown();
+        conn.disconnect();
+//        TransportConnection.killVMBroker(1);
+    }
+
+    /**
+     * put a message and wait for age alert
+     *
+     * @throws Exception on error
+     */
+    public void testSinglePutThenWait() throws Exception
+    {
+        conn.put(queue, payload, 1);
+        log("waiting ms: " + agePeriod);
+        Thread.sleep(agePeriod);
+        log("wait period over");
+        conn.put(queue, payload, 1);
+    }
+}

Propchange: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/ManualTests/MessageAgeAlert.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/ManualTests/MessageAgeAlert.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java?view=diff&rev=513835&r1=513834&r2=513835
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java Fri Mar  2 08:37:28 2007
@@ -239,7 +239,7 @@
 
         try
         {
-            _logger.error("Send additional messages");
+            _logger.info("Send additional messages");
 
             for (int msg = 0; msg < MSG_COUNT; msg++)
             {

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java?view=diff&rev=513835&r1=513834&r2=513835
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java Fri Mar  2 08:37:28 2007
@@ -40,9 +40,14 @@
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.testutil.QpidClientConnection;
 import org.apache.log4j.Logger;
 import org.apache.log4j.Level;
 
+/**
+ * based on FT304 - Competing consumers
+ * provided by customer 
+ */
 public class MessageRequeueTest extends TestCase
 {
 
@@ -67,7 +72,7 @@
         Logger session = Logger.getLogger("org.apache.qpid.client.AMQSession");
         session.setLevel(Level.ERROR);
 
-        QpidClientConnection conn = new QpidClientConnection();
+        QpidClientConnection conn = new QpidClientConnection(BROKER);
 
         conn.connect();
         // clear queue
@@ -77,7 +82,7 @@
         conn.put(queue, payload, numTestMessages);
         // close this connection
         conn.disconnect();
-    } 
+    }
 
     protected void tearDown() throws Exception
     {
@@ -139,7 +144,7 @@
             try
             {
                 _logger.info("consumer-" + id + ": starting");
-                QpidClientConnection conn = new QpidClientConnection();
+                QpidClientConnection conn = new QpidClientConnection(BROKER);
 
                 conn.connect();
 
@@ -179,252 +184,4 @@
             return id;
         }
     }
-
-
-    public class QpidClientConnection implements ExceptionListener
-    {
-        private boolean transacted = true;
-        private int ackMode = Session.CLIENT_ACKNOWLEDGE;
-        private Connection connection;
-
-        private String virtualHost;
-        private String brokerlist;
-        private int prefetch;
-        protected Session session;
-        protected boolean connected;
-
-        public QpidClientConnection()
-        {
-            super();
-            setVirtualHost("/test");
-            setBrokerList(BROKER);
-            setPrefetch(5000);
-        }
-
-
-        public void connect() throws JMSException
-        {
-            if (!connected)
-            {
-                /*
-                * amqp://[user:pass@][clientid]/virtualhost?
-                * brokerlist='[transport://]host[:port][?option='value'[&option='value']];'
-                * [&failover='method[?option='value'[&option='value']]']
-                * [&option='value']"
-                */
-                String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
-                try
-                {
-                    AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl));
-                    _logger.info("connecting to Qpid :" + brokerUrl);
-                    connection = factory.createConnection();
-
-                    // register exception listener
-                    connection.setExceptionListener(this);
-
-                    session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch);
-
-
-                    _logger.info("starting connection");
-                    connection.start();
-
-                    connected = true;
-                }
-                catch (URLSyntaxException e)
-                {
-                    throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage());
-                }
-            }
-        }
-
-        public void disconnect() throws JMSException
-        {
-            if (connected)
-            {
-                session.commit();
-                session.close();
-                connection.close();
-                connected = false;
-                _logger.info("disconnected");
-            }
-        }
-
-        public void disconnectWithoutCommit() throws JMSException
-        {
-            if (connected)
-            {
-                session.close();
-                connection.close();
-                connected = false;
-                _logger.info("disconnected without commit");
-            }
-        }
-
-        public String getBrokerList()
-        {
-            return brokerlist;
-        }
-
-        public void setBrokerList(String brokerlist)
-        {
-            this.brokerlist = brokerlist;
-        }
-
-        public String getVirtualHost()
-        {
-            return virtualHost;
-        }
-
-        public void setVirtualHost(String virtualHost)
-        {
-            this.virtualHost = virtualHost;
-        }
-
-        public void setPrefetch(int prefetch)
-        {
-            this.prefetch = prefetch;
-        }
-
-
-        /** override as necessary */
-        public void onException(JMSException exception)
-        {
-            _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage());
-        }
-
-        public boolean isConnected()
-        {
-            return connected;
-        }
-
-        public Session getSession()
-        {
-            return session;
-        }
-
-        /**
-         * Put a String as a text messages, repeat n times. A null payload will result in a null message.
-         *
-         * @param queueName The queue name to put to
-         * @param payload   the content of the payload
-         * @param copies    the number of messages to put
-         *
-         * @throws javax.jms.JMSException any exception that occurs
-         */
-        public void put(String queueName, String payload, int copies) throws JMSException
-        {
-            if (!connected)
-            {
-                connect();
-            }
-
-            _logger.info("putting to queue " + queueName);
-            Queue queue = session.createQueue(queueName);
-
-            final MessageProducer sender = session.createProducer(queue);
-
-            for (int i = 0; i < copies; i++)
-            {
-                Message m = session.createTextMessage(payload + i);
-                sender.send(m);
-            }
-
-            session.commit();
-            sender.close();
-            _logger.info("put " + copies + " copies");
-        }
-
-        /**
-         * GET the top message on a queue. Consumes the message. Accepts timeout value.
-         *
-         * @param queueName   The quename to get from
-         * @param readTimeout The timeout to use
-         *
-         * @return the content of the text message if any
-         *
-         * @throws javax.jms.JMSException any exception that occured
-         */
-        public String getNextMessage(String queueName, long readTimeout) throws JMSException
-        {
-            if (!connected)
-            {
-                connect();
-            }
-
-            Queue queue = session.createQueue(queueName);
-
-            final MessageConsumer consumer = session.createConsumer(queue);
-
-            Message message = consumer.receive(readTimeout);
-            session.commit();
-            consumer.close();
-
-            String result;
-
-            // all messages we consume should be TextMessages
-            if (message instanceof TextMessage)
-            {
-                result = ((TextMessage) message).getText();
-            }
-            else if (null == message)
-            {
-                result = null;
-            }
-            else
-            {
-                _logger.info("warning: received non-text message");
-                result = message.toString();
-            }
-
-            return result;
-        }
-
-        /**
-         * GET the top message on a queue. Consumes the message.
-         *
-         * @param queueName The Queuename to get from
-         *
-         * @return The string content of the text message, if any received
-         *
-         * @throws javax.jms.JMSException any exception that occurs
-         */
-        public String getNextMessage(String queueName) throws JMSException
-        {
-            return getNextMessage(queueName, 0);
-        }
-
-        /**
-         * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer.
-         *
-         * @param queueName   The Queue name to consume from
-         * @param readTimeout The timeout for each consume
-         *
-         * @throws javax.jms.JMSException Any exception that occurs during the consume
-         * @throws InterruptedException   If the consume thread was interrupted during a consume.
-         */
-        public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException
-        {
-            if (!connected)
-            {
-                connect();
-            }
-
-            _logger.info("consuming queue " + queueName);
-            Queue queue = session.createQueue(queueName);
-
-            final MessageConsumer consumer = session.createConsumer(queue);
-            int messagesReceived = 0;
-
-            _logger.info("consuming...");
-            while ((consumer.receive(readTimeout)) != null)
-            {
-                messagesReceived++;
-            }
-
-            session.commit();
-            consumer.close();
-            _logger.info("consumed: " + messagesReceived);
-        }
-    }
-
 }

Added: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java?view=auto&rev=513835
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java (added)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java Fri Mar  2 08:37:28 2007
@@ -0,0 +1,273 @@
+package org.apache.qpid.testutil;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.log4j.Logger;
+
+import javax.jms.ExceptionListener;
+import javax.jms.Session;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.MessageProducer;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+
+/**
+ * This shouldn't be used for making production connections.
+ * As it has some unusual operations.  
+ *
+ * Supplied by customer
+ */
+public class QpidClientConnection implements ExceptionListener
+{
+    private static final Logger _logger = Logger.getLogger(QpidClientConnection.class);
+
+
+        private boolean transacted = true;
+        private int ackMode = Session.CLIENT_ACKNOWLEDGE;
+        private Connection connection;
+
+        private String virtualHost;
+        private String brokerlist;
+        private int prefetch;
+        protected Session session;
+        protected boolean connected;
+
+        public QpidClientConnection(String BROKER)
+        {
+            super();
+            setVirtualHost("/test");
+            setBrokerList(BROKER);
+            setPrefetch(5000);
+        }
+
+
+        public void connect() throws JMSException
+        {
+            if (!connected)
+            {
+                /*
+                * amqp://[user:pass@][clientid]/virtualhost?
+                * brokerlist='[transport://]host[:port][?option='value'[&option='value']];'
+                * [&failover='method[?option='value'[&option='value']]']
+                * [&option='value']"
+                */
+                String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
+                try
+                {
+                    AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl));
+                    _logger.info("connecting to Qpid :" + brokerUrl);
+                    connection = factory.createConnection();
+
+                    // register exception listener
+                    connection.setExceptionListener(this);
+
+                    session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch);
+
+
+                    _logger.info("starting connection");
+                    connection.start();
+
+                    connected = true;
+                }
+                catch (URLSyntaxException e)
+                {
+                    throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage());
+                }
+            }
+        }
+
+        public void disconnect() throws JMSException
+        {
+            if (connected)
+            {
+                session.commit();
+                session.close();
+                connection.close();
+                connected = false;
+                _logger.info("disconnected");
+            }
+        }
+
+        public void disconnectWithoutCommit() throws JMSException
+        {
+            if (connected)
+            {
+                session.close();
+                connection.close();
+                connected = false;
+                _logger.info("disconnected without commit");
+            }
+        }
+
+        public String getBrokerList()
+        {
+            return brokerlist;
+        }
+
+        public void setBrokerList(String brokerlist)
+        {
+            this.brokerlist = brokerlist;
+        }
+
+        public String getVirtualHost()
+        {
+            return virtualHost;
+        }
+
+        public void setVirtualHost(String virtualHost)
+        {
+            this.virtualHost = virtualHost;
+        }
+
+        public void setPrefetch(int prefetch)
+        {
+            this.prefetch = prefetch;
+        }
+
+
+        /** override as necessary */
+        public void onException(JMSException exception)
+        {
+            _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage());
+        }
+
+        public boolean isConnected()
+        {
+            return connected;
+        }
+
+        public Session getSession()
+        {
+            return session;
+        }
+
+        /**
+         * Put a String as a text messages, repeat n times. A null payload will result in a null message.
+         *
+         * @param queueName The queue name to put to
+         * @param payload   the content of the payload
+         * @param copies    the number of messages to put
+         *
+         * @throws javax.jms.JMSException any exception that occurs
+         */
+        public void put(String queueName, String payload, int copies) throws JMSException
+        {
+            if (!connected)
+            {
+                connect();
+            }
+
+            _logger.info("putting to queue " + queueName);
+            Queue queue = session.createQueue(queueName);
+
+            final MessageProducer sender = session.createProducer(queue);
+
+            for (int i = 0; i < copies; i++)
+            {
+                Message m = session.createTextMessage(payload + i);
+                sender.send(m);
+            }
+
+            session.commit();
+            sender.close();
+            _logger.info("put " + copies + " copies");
+        }
+
+        /**
+         * GET the top message on a queue. Consumes the message. Accepts timeout value.
+         *
+         * @param queueName   The quename to get from
+         * @param readTimeout The timeout to use
+         *
+         * @return the content of the text message if any
+         *
+         * @throws javax.jms.JMSException any exception that occured
+         */
+        public String getNextMessage(String queueName, long readTimeout) throws JMSException
+        {
+            if (!connected)
+            {
+                connect();
+            }
+
+            Queue queue = session.createQueue(queueName);
+
+            final MessageConsumer consumer = session.createConsumer(queue);
+
+            Message message = consumer.receive(readTimeout);
+            session.commit();
+            consumer.close();
+
+            String result;
+
+            // all messages we consume should be TextMessages
+            if (message instanceof TextMessage)
+            {
+                result = ((TextMessage) message).getText();
+            }
+            else if (null == message)
+            {
+                result = null;
+            }
+            else
+            {
+                _logger.info("warning: received non-text message");
+                result = message.toString();
+            }
+
+            return result;
+        }
+
+        /**
+         * GET the top message on a queue. Consumes the message.
+         *
+         * @param queueName The Queuename to get from
+         *
+         * @return The string content of the text message, if any received
+         *
+         * @throws javax.jms.JMSException any exception that occurs
+         */
+        public String getNextMessage(String queueName) throws JMSException
+        {
+            return getNextMessage(queueName, 0);
+        }
+
+        /**
+         * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer.
+         *
+         * @param queueName   The Queue name to consume from
+         * @param readTimeout The timeout for each consume
+         *
+         * @throws javax.jms.JMSException Any exception that occurs during the consume
+         * @throws InterruptedException   If the consume thread was interrupted during a consume.
+         */
+        public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException
+        {
+            if (!connected)
+            {
+                connect();
+            }
+
+            _logger.info("consuming queue " + queueName);
+            Queue queue = session.createQueue(queueName);
+
+            final MessageConsumer consumer = session.createConsumer(queue);
+            int messagesReceived = 0;
+
+            _logger.info("consuming...");
+            while ((consumer.receive(readTimeout)) != null)
+            {
+                messagesReceived++;
+            }
+
+            session.commit();
+            consumer.close();
+            _logger.info("consumed: " + messagesReceived);
+        }
+    }
+

Propchange: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java?view=auto&rev=513835
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java (added)
+++ incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java Fri Mar  2 08:37:28 2007
@@ -0,0 +1,31 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ *
+ */
+package org.apache.qpid;
+
+import org.apache.qpid.protocol.AMQConstant;
+
+public class AMQTimeoutException extends AMQException
+{
+    public AMQTimeoutException(String message)
+    {
+        super(AMQConstant.REQUEST_TIMEOUT.getCode(), message);
+    }
+}

Propchange: incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java?view=diff&rev=513835&r1=513834&r2=513835
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java Fri Mar  2 08:37:28 2007
@@ -82,6 +82,8 @@
 
     public static final AMQConstant NOT_FOUND = new AMQConstant(404, "not found", true);
 
+    public static final AMQConstant REQUEST_TIMEOUT = new AMQConstant(408, "Request Timeout", true);    
+
     public static final AMQConstant FRAME_ERROR = new AMQConstant(501, "frame error", true);
 
     public static final AMQConstant SYNTAX_ERROR = new AMQConstant(502, "syntax error", true);

Modified: incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?view=diff&rev=513835&r1=513834&r2=513835
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Fri Mar  2 08:37:28 2007
@@ -26,13 +26,11 @@
 
 import javax.management.Notification;
 
-/**
- * This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters
- */
+/** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
 public class AMQQueueAlertTest extends TestCase
 {
-    private final static int MAX_MESSAGE_COUNT = 50; 
-    private final static long MAX_MESSAGE_AGE = 2000;   // 2 sec
+    private final static int MAX_MESSAGE_COUNT = 50;
+    private final static long MAX_MESSAGE_AGE = 250;   // 0.25 sec
     private final static long MAX_MESSAGE_SIZE = 2000;  // 2 KB
     private final static long MAX_QUEUE_DEPTH = 10000;  // 10 KB
     private AMQQueue _queue;
@@ -42,19 +40,20 @@
 
     /**
      * Tests if the alert gets thrown when message count increases the threshold limit
+     *
      * @throws Exception
      */
     public void testMessageCountAlert() throws Exception
     {
         _queue = new AMQQueue("testQueue1", false, "AMQueueAlertTest", false, _queueRegistry);
-        _queueMBean = (AMQQueueMBean)_queue.getManagedObject();
+        _queueMBean = (AMQQueueMBean) _queue.getManagedObject();
 
         _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
 
         sendMessages(MAX_MESSAGE_COUNT, 256l);
         assertTrue(_queueMBean.getMessageCount() == MAX_MESSAGE_COUNT);
 
-        Notification lastNotification= _queueMBean.getLastNotification();
+        Notification lastNotification = _queueMBean.getLastNotification();
         assertNotNull(lastNotification);
 
         String notificationMsg = lastNotification.getMessage();
@@ -63,19 +62,20 @@
 
     /**
      * Tests if the Message Size alert gets thrown when message of higher than threshold limit is sent
+     *
      * @throws Exception
      */
     public void testMessageSizeAlert() throws Exception
     {
         _queue = new AMQQueue("testQueue2", false, "AMQueueAlertTest", false, _queueRegistry);
-        _queueMBean = (AMQQueueMBean)_queue.getManagedObject();
+        _queueMBean = (AMQQueueMBean) _queue.getManagedObject();
         _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
         _queueMBean.setMaximumMessageSize(MAX_MESSAGE_SIZE);
 
         sendMessages(1, MAX_MESSAGE_SIZE * 2);
         assertTrue(_queueMBean.getMessageCount() == 1);
 
-        Notification lastNotification= _queueMBean.getLastNotification();
+        Notification lastNotification = _queueMBean.getLastNotification();
         assertNotNull(lastNotification);
 
         String notificationMsg = lastNotification.getMessage();
@@ -84,12 +84,13 @@
 
     /**
      * Tests if Queue Depth alert is thrown when queue depth reaches the threshold value
+     *
      * @throws Exception
      */
     public void testQueueDepthAlert() throws Exception
     {
         _queue = new AMQQueue("testQueue3", false, "AMQueueAlertTest", false, _queueRegistry);
-        _queueMBean = (AMQQueueMBean)_queue.getManagedObject();
+        _queueMBean = (AMQQueueMBean) _queue.getManagedObject();
         _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
         _queueMBean.setMaximumQueueDepth(MAX_QUEUE_DEPTH);
 
@@ -98,7 +99,7 @@
             sendMessages(1, MAX_MESSAGE_SIZE);
         }
 
-        Notification lastNotification= _queueMBean.getLastNotification();
+        Notification lastNotification = _queueMBean.getLastNotification();
         assertNotNull(lastNotification);
 
         String notificationMsg = lastNotification.getMessage();
@@ -106,23 +107,27 @@
     }
 
     /**
-     * Tests if MESSAGE AGE alert is thrown, when a message is in the queue for time higher than
-     * threshold value of message age
+     * Tests if MESSAGE AGE alert is thrown, when a message is in the queue for time higher than threshold value of
+     * message age
+     *
      * @throws Exception
      */
     public void testMessageAgeAlert() throws Exception
     {
         _queue = new AMQQueue("testQueue4", false, "AMQueueAlertTest", false, _queueRegistry);
-        _queueMBean = (AMQQueueMBean)_queue.getManagedObject();
+        _queueMBean = (AMQQueueMBean) _queue.getManagedObject();
         _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
         _queueMBean.setMaximumMessageAge(MAX_MESSAGE_AGE);
 
         sendMessages(1, MAX_MESSAGE_SIZE);
-        Thread.sleep(MAX_MESSAGE_AGE);
+
+        // Ensure message sits on queue long enough to age.
+        Thread.sleep(MAX_MESSAGE_AGE * 2);
+
         sendMessages(1, MAX_MESSAGE_SIZE);
         assertTrue(_queueMBean.getMessageCount() == 2);
 
-        Notification lastNotification= _queueMBean.getLastNotification();
+        Notification lastNotification = _queueMBean.getLastNotification();
         assertNotNull(lastNotification);
 
         String notificationMsg = lastNotification.getMessage();
@@ -133,7 +138,7 @@
     {
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Establish some way to determine the version for the test.
-        BasicPublishBody publish = new BasicPublishBody((byte)8, (byte)0);
+        BasicPublishBody publish = new BasicPublishBody((byte) 8, (byte) 0);
         publish.immediate = immediate;
         ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
         contentHeaderBody.bodySize = size;   // in bytes