You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/06/19 16:53:31 UTC

svn commit: r548753 - in /incubator/qpid/trunk/qpid/java: broker/ client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/client/failover/ client/src/main/java/org/apache/qpid/client/protocol/ common/src/main/java/org/apache/q...

Author: ritchiem
Date: Tue Jun 19 07:53:29 2007
New Revision: 548753

URL: http://svn.apache.org/viewvc?view=rev&rev=548753
Log:
Fixed outstanding merge issues and updates to trunk code that were required for sl4j.

Modified:
    incubator/qpid/trunk/qpid/java/broker/pom.xml
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java
    incubator/qpid/trunk/qpid/java/common/templates/model/MethodRegistryClass.tmpl
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java

Modified: incubator/qpid/trunk/qpid/java/broker/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/pom.xml?view=diff&rev=548753&r1=548752&r2=548753
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/pom.xml (original)
+++ incubator/qpid/trunk/qpid/java/broker/pom.xml Tue Jun 19 07:53:29 2007
@@ -49,6 +49,18 @@
         </dependency>
 
         <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>  
+            <version>1.4.0</version> 
+        </dependency>
+
+        <dependency>  
+            <groupId>org.slf4j</groupId> 
+            <artifactId>slf4j-log4j12</artifactId>  
+            <version>1.4.0</version>  
+        </dependency>
+
+        <dependency>
             <groupId>commons-cli</groupId>
             <artifactId>commons-cli</artifactId>
         </dependency>
@@ -64,7 +76,7 @@
         </dependency>
 
         <!-- Test Dependencies -->
-        <dependency>  
+        <dependency>
             <groupId>org.slf4j</groupId> 
             <artifactId>slf4j-log4j12</artifactId>  
             <version>1.4.0</version>  
@@ -180,8 +192,8 @@
                                 <tasks>
 
                                     <condition property="broker.dir" 
-                                             else="${user.dir}${file.separator}broker"
-                                             value="${user.dir}">
+                                               else="${user.dir}${file.separator}broker"
+                                               value="${user.dir}">
                                         <contains string="${user.dir}" substring="broker" />
                                     </condition>
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=548753&r1=548752&r2=548753
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Tue Jun 19 07:53:29 2007
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.client;
 
-import org.apache.log4j.Logger;
 import org.apache.qpid.AMQConnectionFailureException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQUndeliveredException;
@@ -48,6 +47,10 @@
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.url.URLSyntaxException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
 import javax.jms.ConnectionConsumer;
 import javax.jms.ConnectionMetaData;
 import javax.jms.Destination;
@@ -65,6 +68,7 @@
 import javax.naming.Reference;
 import javax.naming.Referenceable;
 import javax.naming.StringRefAddr;
+
 import java.io.IOException;
 import java.net.ConnectException;
 import java.nio.channels.UnresolvedAddressException;
@@ -81,7 +85,7 @@
 
 public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
 {
-    private static final Logger _logger = Logger.getLogger(AMQConnection.class);
+    private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
 
     private AtomicInteger _idFactory = new AtomicInteger(0);
 
@@ -237,8 +241,8 @@
     }
 
     /**
-     * @todo Some horrible stuff going on here with setting exceptions to be non-null to detect if an exception was
-     * thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon.
+     * @todo Some horrible stuff going on here with setting exceptions to be non-null to detect if an exception
+     *       was thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon.
      */
     public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
     {
@@ -1067,6 +1071,7 @@
      */
     public void exceptionReceived(Throwable cause)
     {
+
         if (_logger.isDebugEnabled())
         {
             _logger.debug("exceptionReceived done by:" + Thread.currentThread().getName(), cause);

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=548753&r1=548752&r2=548753
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Jun 19 07:53:29 2007
@@ -20,7 +20,7 @@
  */
 package org.apache.qpid.client;
 
-import org.apache.log4j.Logger;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInvalidArgumentException;
 import org.apache.qpid.AMQInvalidRoutingKeyException;
@@ -74,6 +74,9 @@
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.URLSyntaxException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.jms.BytesMessage;
 import javax.jms.Destination;
 import javax.jms.IllegalStateException;
@@ -98,6 +101,7 @@
 import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
+
 import java.io.Serializable;
 import java.text.MessageFormat;
 import java.util.ArrayList;
@@ -109,14 +113,30 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td>
+ * </table>
+ *
+ * @todo Different FailoverSupport implementation are needed on the same method call, in different situations. For
+ *       example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second
+ *       fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of
+ *       the fail-over process, the retry handler could be used to automatically retry the operation once the connection
+ *       has been reestablished. All fail-over protected operations should be placed in private methods, with
+ *       FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the
+ *       fail-over process sets a nowait flag and uses an async method call instead.
+ *
+ * @todo Two new objects created on every failover supported method call. Consider more efficient ways of doing this,
+ *       after looking at worse bottlenecks first.
  */
 public class AMQSession extends Closeable implements Session, QueueSession, TopicSession
 {
     /** Used for debugging. */
-    private static final Logger _logger = Logger.getLogger(AMQSession.class);
+    private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
 
     /** Used for debugging in the dispatcher. */
-    private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class);
+    private static final Logger _dispatcherLogger = LoggerFactory.getLogger(Dispatcher.class);
 
     /** The default maximum number of prefetched message at which to suspend the channel. */
     public static final int DEFAULT_PREFETCH_HIGH_MARK = 5000;
@@ -190,8 +210,8 @@
             new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
 
     /**
-     * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked up
-     * in the {@link #_subscriptions} map.
+     * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked
+     * up in the {@link #_subscriptions} map.
      */
     private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
             new ConcurrentHashMap<BasicMessageConsumer, String>();
@@ -253,8 +273,8 @@
     private boolean _suspended;
 
     /**
-     * Used to protect the suspension of this session, so that critical code can be executed during suspension, without
-     * the session being resumed by other threads.
+     * Used to protect the suspension of this session, so that critical code can be executed during suspension,
+     * without the session being resumed by other threads.
      */
     private final Object _suspensionLock = new Object();
 
@@ -350,12 +370,12 @@
     /**
      * Creates a new session on a connection with the default message factory factory.
      *
-     * @param con                 The connection on which to create the session.
-     * @param channelId           The unique identifier for the session.
-     * @param transacted          Indicates whether or not the session is transactional.
-     * @param acknowledgeMode     The acknoledgement mode for the session.
-     * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
-     * @param defaultPrefetchLow  The number of prefetched messages at which to resume the session.
+     * @param con                     The connection on which to create the session.
+     * @param channelId               The unique identifier for the session.
+     * @param transacted              Indicates whether or not the session is transactional.
+     * @param acknowledgeMode         The acknoledgement mode for the session.
+     * @param defaultPrefetchHigh     The maximum number of messages to prefetched before suspending the session.
+     * @param defaultPrefetchLow      The number of prefetched messages at which to resume the session.
      */
     AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh,
                int defaultPrefetchLow)
@@ -364,6 +384,18 @@
              defaultPrefetchLow);
     }
 
+    // ===== JMS Session methods.
+
+    /**
+     * Closes the session with no timeout.
+     *
+     * @throws JMSException If the JMS provider fails to close the session due to some internal error.
+     */
+    public void close() throws JMSException
+    {
+        close(-1);
+    }
+
     /**
      * Acknowledges all unacknowledged messages on the session, for all message consumers on the session.
      *
@@ -416,7 +448,9 @@
      * @param exchangeName The exchange to bind the queue on.
      *
      * @throws AMQException If the queue cannot be bound for any reason.
+     *
      * @todo Be aware of possible changes to parameter order as versions change.
+     *
      * @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges?
      */
     public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
@@ -444,30 +478,24 @@
     }
 
     /**
-     * Closes the session with no timeout.
-     *
-     * @throws JMSException If the JMS provider fails to close the session due to some internal error.
-     */
-    public void close() throws JMSException
-    {
-        close(-1);
-    }
 
-    /**
      * Closes the session.
      *
-     * <p/>Note that this operation succeeds automatically if a fail-over interupts the sycnronous request to close the
-     * channel. This is because the channel is marked as closed before the request to close it is made, so the fail-over
-     * should not re-open it.
+     * <p/>Note that this operation succeeds automatically if a fail-over interupts the sycnronous request to close
+     * the channel. This is because the channel is marked as closed before the request to close it is made, so the
+     * fail-over should not re-open it.
      *
      * @param timeout The timeout in milliseconds to wait for the session close acknoledgement from the broker.
      *
      * @throws JMSException If the JMS provider fails to close the session due to some internal error.
+     *
      * @todo Be aware of possible changes to parameter order as versions change.
-     * @todo Not certain about the logic of ignoring the failover exception, because the channel won't be re-opened. May
-     * need to examine this more carefully.
-     * @todo Note that taking the failover mutex doesn't prevent this operation being interrupted by a failover, because
-     * the failover process sends the failover event before acquiring the mutex itself.
+     *
+     * @todo Not certain about the logic of ignoring the failover exception, because the channel won't be
+     *       re-opened. May need to examine this more carefully.
+     *
+     * @todo Note that taking the failover mutex doesn't prevent this operation being interrupted by a failover,
+     *       because the failover process sends the failover event before acquiring the mutex itself.
      */
     public void close(long timeout) throws JMSException
     {
@@ -556,12 +584,13 @@
      * Commits all messages done in this transaction and releases any locks currently held.
      *
      * <p/>If the commit fails, because the commit itself is interrupted by a fail-over between requesting that the
-     * commit be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown. The
-     * client will be unable to determine whether or not the commit actually happened on the broker in this case.
+     * commit be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown.
+     * The client will be unable to determine whether or not the commit actually happened on the broker in this case.
      *
      * @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does
      *                      not mean that the commit is known to have failed, merely that it is not known whether it
      *                      failed or not.
+     *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     public void commit() throws JMSException
@@ -917,6 +946,7 @@
      * @param exclusive  Flag to indicate that the queue is exclusive to this client.
      *
      * @throws AMQException If the queue cannot be declared for any reason.
+     *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable,
@@ -1257,9 +1287,12 @@
      *
      * <p/>Restarting a session causes it to take the following actions:
      *
-     * <ul> <li>Stop message delivery.</li> <li>Mark all messages that might have been delivered but not acknowledged as
-     * "redelivered". <li>Restart the delivery sequence including all unacknowledged messages that had been previously
-     * delivered. Redelivered messages do not have to be delivered in exactly their original delivery order.</li> </ul>
+     * <ul>
+     * <li>Stop message delivery.</li>
+     * <li>Mark all messages that might have been delivered but not acknowledged as "redelivered".
+     * <li>Restart the delivery sequence including all unacknowledged messages that had been previously delivered.
+     *     Redelivered messages do not have to be delivered in exactly their original delivery order.</li>
+     * </ul>
      *
      * <p/>If the recover operation is interrupted by a fail-over, between asking that the broker begin recovery and
      * receiving acknolwedgement that it hasm then a JMSException will be thrown. In this case it will not be possible
@@ -1373,12 +1406,13 @@
      * Commits all messages done in this transaction and releases any locks currently held.
      *
      * <p/>If the rollback fails, because the rollback itself is interrupted by a fail-over between requesting that the
-     * rollback be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown. The
-     * client will be unable to determine whether or not the rollback actually happened on the broker in this case.
+     * rollback be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown.
+     * The client will be unable to determine whether or not the rollback actually happened on the broker in this case.
      *
      * @throws JMSException If the JMS provider fails to rollback the transaction due to some internal error. This does
      *                      not mean that the rollback is known to have failed, merely that it is not known whether it
      *                      failed or not.
+     *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     public void rollback() throws JMSException
@@ -1650,6 +1684,7 @@
      * @return <tt>true</tt> if the queue is bound to the exchange and routing key, <tt>false</tt> if not.
      *
      * @throws JMSException If the query fails for any reason.
+     *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
@@ -1722,9 +1757,10 @@
      * Starts the session, which ensures that it is not suspended and that its event dispatcher is running.
      *
      * @throws AMQException If the session cannot be started for any reason.
+     *
      * @todo This should be controlled by _stopped as it pairs with the stop method fixme or check the
-     * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages for
-     * each subsequent call to flow.. only need to do this if we have called stop.
+     *       FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages
+     *       for each subsequent call to flow.. only need to do this if we have called stop.
      */
     void start() throws AMQException
     {
@@ -2084,6 +2120,7 @@
      * @param nowait
      *
      * @throws AMQException If the exchange cannot be declared for any reason.
+     *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     private void declareExchange(final AMQShortString name, final AMQShortString type,
@@ -2128,7 +2165,9 @@
      *         the client.
      *
      * @throws AMQException If the queue cannot be declared for any reason.
+     *
      * @todo Verify the destiation is valid or throw an exception.
+     *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     private AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler)
@@ -2172,6 +2211,7 @@
      * @param queueName The name of the queue to delete.
      *
      * @throws JMSException If the queue could not be deleted for any reason.
+     *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     private void deleteQueue(final AMQShortString queueName) throws JMSException
@@ -2460,6 +2500,7 @@
      *                should be unsuspended.
      *
      * @throws AMQException If the session cannot be suspended for any reason.
+     *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     private void suspendChannel(boolean suspend) throws AMQException // , FailoverException

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java?view=diff&rev=548753&r1=548752&r2=548753
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java Tue Jun 19 07:53:29 2007
@@ -20,14 +20,15 @@
  */
 package org.apache.qpid.client.failover;
 
-import org.apache.log4j.Logger;
-
 import org.apache.mina.common.IoSession;
 
 import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.state.AMQStateManager;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.concurrent.CountDownLatch;
 
 /**
@@ -78,7 +79,7 @@
 public class FailoverHandler implements Runnable
 {
     /** Used for debugging. */
-    private static final Logger _logger = Logger.getLogger(FailoverHandler.class);
+    private static final Logger _logger = LoggerFactory.getLogger(FailoverHandler.class);
 
     /** Holds the MINA session for the connection that has failed, not the connection that is being failed onto. */
     private final IoSession _session;

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=548753&r1=548752&r2=548753
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Tue Jun 19 07:53:29 2007
@@ -20,12 +20,12 @@
  */
 package org.apache.qpid.client.protocol;
 
-import org.apache.log4j.Logger;
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.filter.SSLFilter;
 import org.apache.mina.filter.codec.ProtocolCodecFilter;
+
 import org.apache.qpid.AMQConnectionClosedException;
 import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.AMQException;
@@ -56,6 +56,9 @@
 import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.ssl.SSLContextFactory;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Iterator;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.CountDownLatch;
@@ -63,10 +66,10 @@
 /**
  * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the
  * network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the
- * specific event model of AMQP, by revealing the type of the received events (from decoded data), and passing the event
- * on to more specific handlers for the type. In this sense, it channels the richer event model of AMQP, expressed in
- * terms of methods and so on, through the cruder, general purpose event model of MINA, expressed in terms of "message
- * received" and so on.
+ * specific event model of AMQP, by revealing the type of the received events (from decoded data), and passing the
+ * event on to more specific handlers for the type. In this sense, it channels the richer event model of AMQP,
+ * expressed in terms of methods and so on, through the cruder, general purpose event model of MINA, expressed in
+ * terms of "message received" and so on.
  *
  * <p/>There is a 1:1 mapping between an AMQProtocolHandler and an {@link AMQConnection}. The connection class is
  * exposed to the end user of the AMQP client API, and also implements the JMS Connection API, so provides the public
@@ -76,52 +79,56 @@
  * <p/>There is a 1:many mapping between an AMQProtocolHandler and a set of {@link AMQSession}s. At the MINA level,
  * there is one session per connection. At the AMQP level there can be many channels which are also called sessions in
  * JMS parlance. The {@link AMQSession}s are managed through an {@link AMQProtocolSession} instance. The protocol
- * session is similar to the MINA per-connection session, except that it can span the lifecycle of multiple MINA
- * sessions in the event of failover. See below for more information about this.
+ * session is similar to the MINA per-connection session, except that it can span the lifecycle of multiple MINA sessions
+ * in the event of failover. See below for more information about this.
  *
  * <p/>Mina provides a session container that can be used to store/retrieve arbitrary objects as String named
- * attributes. A more convenient, type-safe, container for session data is provided in the form of {@link
- * AMQProtocolSession}.
+ * attributes. A more convenient, type-safe, container for session data is provided in the form of
+ * {@link AMQProtocolSession}.
  *
  * <p/>A common way to use MINA is to have a single instance of the event handler, and for MINA to pass in its session
- * object with every event, and for per-connection data to be held in the MINA session (perhaps using a type-safe
- * wrapper as described above). This event handler is different, because dealing with failover complicates things. To
- * the end client of an AMQConnection, a failed over connection is still handled through the same connection instance,
- * but behind the scenes a new transport connection, and MINA session will have been created. The MINA session object
- * cannot be used to track the state of the fail-over process, because it is destroyed and a new one is created, as the
- * old connection is shutdown and a new one created. For this reason, an AMQProtocolHandler is created per AMQConnection
+ * object with every event, and for per-connection data to be held in the MINA session (perhaps using a type-safe wrapper
+ * as described above). This event handler is different, because dealing with failover complicates things. To the
+ * end client of an AMQConnection, a failed over connection is still handled through the same connection instance, but
+ * behind the scenes a new transport connection, and MINA session will have been created. The MINA session object cannot
+ * be used to track the state of the fail-over process, because it is destroyed and a new one is created, as the old
+ * connection is shutdown and a new one created. For this reason, an AMQProtocolHandler is created per AMQConnection
  * and the protocol session data is held outside of the MINA IOSession.
  *
- * <p/>This handler is responsibile for setting up the filter chain to filter all events for this handler through. The
- * filter chain is set up as a stack of event handers that perform the following functions (working upwards from the
- * network traffic at the bottom), handing off incoming events to an asynchronous thread pool to do the work, optionally
- * handling secure sockets encoding/decoding, encoding/decoding the AMQP format itself.
+ * <p/>This handler is responsibile for setting up the filter chain to filter all events for this handler through.
+ * The filter chain is set up as a stack of event handers that perform the following functions (working upwards from
+ * the network traffic at the bottom), handing off incoming events to an asynchronous thread pool to do the work,
+ * optionally handling secure sockets encoding/decoding, encoding/decoding the AMQP format itself.
  *
- * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Create the
- * filter chain to filter this handlers events. <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link
- * SSLFilter}, {@link ReadWriteThreadModel}.
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Create the filter chain to filter this handlers events.
+ *     <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}.
  *
- * <tr><td> Maintain fail-over state. <tr><td> </table>
+ * <tr><td> Maintain fail-over state.
+ * <tr><td>
+ * </table>
  *
  * @todo Explain the system property: amqj.shared_read_write_pool. How does putting the protocol codec filter before the
- * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing
- * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec filter
- * before it mean not doing the read/write asynchronously but in the main filter thread?
+ *       async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing
+ *       anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec
+ *       filter before it mean not doing the read/write asynchronously but in the main filter thread?
+ *
  * @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including
- * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of AMQProtocolSesssion
- * and AMQConnection will be the same, so if there is high cohesion between them, they could be merged, although there
- * is sense in keeping the session model seperate. Will clarify things by having data held per protocol handler, per
- * protocol session, per network connection, per channel, in seperate classes, so that lifecycles of the fields match
- * lifecycles of their containing objects.
+ *       failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of
+ *       AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could
+ *       be merged, although there is sense in keeping the session model seperate. Will clarify things by having data
+ *       held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
+ *       that lifecycles of the fields match lifecycles of their containing objects.
  */
 public class AMQProtocolHandler extends IoHandlerAdapter
 {
     /** Used for debugging. */
-    private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class);
+    private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class);
 
     /**
-     * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection instances
-     * and protocol handler instances.
+     * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection
+     * instances and protocol handler instances.
      */
     private AMQConnection _connection;
 
@@ -165,8 +172,8 @@
 
     /**
      * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the
-     * session, which filters the events handled by this handler. The filter chain consists of, handing off events to an
-     * asynchronous thread pool, optionally encoding/decoding ssl, encoding/decoding AMQP.
+     * session, which filters the events handled by this handler. The filter chain consists of, handing off events
+     * to an asynchronous thread pool, optionally encoding/decoding ssl, encoding/decoding AMQP.
      *
      * @param session The MINA session.
      *
@@ -215,8 +222,8 @@
 
     /**
      * Called when the network connection is closed. This can happen, either because the client explicitly requested
-     * that the connection be closed, in which case nothing is done, or because the connection died. In the case where
-     * the connection died, an attempt to failover automatically to a new connection may be started. The failover
+     * that the connection be closed, in which case nothing is done, or because the connection died. In the case
+     * where the connection died, an attempt to failover automatically to a new connection may be started. The failover
      * process will be started, provided that it is the clients policy to allow failover, and provided that a failover
      * has not already been started or failed.
      *
@@ -227,7 +234,7 @@
      * @param session The MINA session.
      *
      * @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and
-     * not otherwise? The above comment doesn't make that clear.
+     *       not otherwise? The above comment doesn't make that clear.
      */
     public void sessionClosed(IoSession session)
     {
@@ -413,10 +420,6 @@
                         {
                             final AMQMethodListener listener = (AMQMethodListener) it.next();
                             wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
-                        }
-                        if (!wasAnyoneInterested)
-                        {
-                            throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener.  Listeners:" + _frameListeners, null);
                         }
                     }
 

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java?view=diff&rev=548753&r1=548752&r2=548753
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java Tue Jun 19 07:53:29 2007
@@ -1,13 +1,13 @@
 package org.apache.qpid.util.concurrent;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 /**
  * Synchronous/Asynchronous puts. Asynchronous is easiest, just wait till can write to queue and deposit data.
@@ -44,7 +44,6 @@
 {
     /** Used for logging. */
     private static final Logger log = LoggerFactory.getLogger(BatchSynchQueueBase.class);
-                                                       
 
     /** Holds a reference to the queue implementation that holds the buffer. */
     Queue<SynchRecordImpl<E>> buffer;

Modified: incubator/qpid/trunk/qpid/java/common/templates/model/MethodRegistryClass.tmpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/templates/model/MethodRegistryClass.tmpl?view=diff&rev=548753&r1=548752&r2=548753
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/templates/model/MethodRegistryClass.tmpl (original)
+++ incubator/qpid/trunk/qpid/java/common/templates/model/MethodRegistryClass.tmpl Tue Jun 19 07:53:29 2007
@@ -1,160 +1,159 @@
-&{MainRegistry.java}
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-/*
- * This file is auto-generated by ${GENERATOR} - do not modify.
- * Supported AMQP versions:
-%{VLIST} *   ${major}-${minor}
- */
- 
-package org.apache.qpid.framing;
-
-import java.util.HashMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.mina.common.ByteBuffer;
-
-public class MainRegistry
-{
-	private static final HashMap<Long, AMQMethodBodyInstanceFactory> classIDMethodIDVersionBodyMap = new HashMap<Long, AMQMethodBodyInstanceFactory>();
-
-	
-	private static final Logger _log = LoggerFactory.getLogger(MainRegistry.class);
-
-	
-
-    private static final int DEFAULT_MINOR_VERSION_COUNT = 10;
-    private static final int DEFAULT_MAJOR_VERSION_COUNT = 10;
-    
-    private static VersionSpecificRegistry[][] _specificRegistries = new VersionSpecificRegistry[DEFAULT_MAJOR_VERSION_COUNT][];
-    	
-    static
-    {
-%{CLIST}	${reg_map_put_method}
-
-        configure();
-    }
-    
-    public static AMQMethodBody get(short classID, short methodID, byte major, byte minor, ByteBuffer in, long size)
-        throws AMQFrameDecodingException
-    {
-		VersionSpecificRegistry registry = getVersionSpecificRegistry(major, minor);
-        AMQMethodBodyInstanceFactory bodyFactory = registry.getMethodBody(classID,methodID);
-		
-        if (bodyFactory == null)
-        {
-            throw new AMQFrameDecodingException(null,
-                "Unable to find a suitable decoder for class " + classID + " and method " +
-                methodID + " in AMQP version " + major + "-" + minor + ".", null);
-        }
-        return bodyFactory.newInstance(major, minor, in, size);
-
-	    
-    }
-	
-    
-	public static VersionSpecificRegistry getVersionSpecificRegistry(ProtocolVersion pv)
-	{
-        return getVersionSpecificRegistry(pv.getMajorVersion(), pv.getMinorVersion());
-    }
-	public static VersionSpecificRegistry getVersionSpecificRegistry(byte major, byte minor)
-	{
-		try
-		{
-			return _specificRegistries[(int)major][(int)minor];
-		}
-		catch (IndexOutOfBoundsException e)
-		{
-			return null;
-		}
-		catch (NullPointerException e)
-		{
-			return null;
-		}
-		
-		
-	}
-    
-	private static VersionSpecificRegistry addVersionSpecificRegistry(byte major, byte minor)
-	{
-		VersionSpecificRegistry[][] registries = _specificRegistries;
-		if(major >= registries.length)
-		{
-			_specificRegistries = new VersionSpecificRegistry[(int)major + 1][];
-			System.arraycopy(registries, 0, _specificRegistries, 0, registries.length);
-			registries = _specificRegistries;
-		}
-		if(registries[major] == null)
-		{
-			registries[major] = new VersionSpecificRegistry[ minor >= DEFAULT_MINOR_VERSION_COUNT ? minor + 1 : DEFAULT_MINOR_VERSION_COUNT ];
-		}
-		else if(registries[major].length <= minor)
-		{
-			VersionSpecificRegistry[] minorArray = registries[major];
-			registries[major] = new VersionSpecificRegistry[ minor + 1 ];
-			System.arraycopy(minorArray, 0, registries[major], 0, minorArray.length);
-			
-		}
-		
-		VersionSpecificRegistry newRegistry = new VersionSpecificRegistry(major,minor);
-		
-		registries[major][minor] = newRegistry;
-		
-		return newRegistry;
-	}
-		
-	private static void registerMethod(short classID, short methodID, byte major, byte minor, AMQMethodBodyInstanceFactory instanceFactory )
-	{
-		VersionSpecificRegistry registry = getVersionSpecificRegistry(major,minor);
-		if(registry == null)
-		{
-			registry = addVersionSpecificRegistry(major,minor);
-			
-		}
-		
-		registry.registerMethod(classID, methodID, instanceFactory);
-        
-    }
-
-	
-    private static void configure()
-    {
-        for(int i = 0 ; i < _specificRegistries.length; i++)
-        {
-            VersionSpecificRegistry[] registries = _specificRegistries[i];
-            if(registries != null)
-            {
-                for(int j = 0 ; j < registries.length; j++)
-                {
-                    VersionSpecificRegistry registry = registries[j];
-                    
-                    if(registry != null)
-                    {
-                        registry.configure();
-                    }
-                }
-            }
-        }
-        
-    }
-    
-}
+&{MainRegistry.java}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by ${GENERATOR} - do not modify.
+ * Supported AMQP versions:
+%{VLIST} *   ${major}-${minor}
+ */
+ 
+package org.apache.qpid.framing;
+
+import java.util.HashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.mina.common.ByteBuffer;
+
+public class MainRegistry
+{
+	private static final HashMap<Long, AMQMethodBodyInstanceFactory> classIDMethodIDVersionBodyMap = new HashMap<Long, AMQMethodBodyInstanceFactory>();
+
+	
+    private static final Logger _log = LoggerFactory.getLogger(MainRegistry.class);
+	
+
+    private static final int DEFAULT_MINOR_VERSION_COUNT = 10;
+    private static final int DEFAULT_MAJOR_VERSION_COUNT = 10;
+    
+    private static VersionSpecificRegistry[][] _specificRegistries = new VersionSpecificRegistry[DEFAULT_MAJOR_VERSION_COUNT][];
+    	
+    static
+    {
+%{CLIST}	${reg_map_put_method}
+
+        configure();
+    }
+    
+    public static AMQMethodBody get(short classID, short methodID, byte major, byte minor, ByteBuffer in, long size)
+        throws AMQFrameDecodingException
+    {
+		VersionSpecificRegistry registry = getVersionSpecificRegistry(major, minor);
+        AMQMethodBodyInstanceFactory bodyFactory = registry.getMethodBody(classID,methodID);
+		
+        if (bodyFactory == null)
+        {
+            throw new AMQFrameDecodingException(null,
+                "Unable to find a suitable decoder for class " + classID + " and method " +
+                methodID + " in AMQP version " + major + "-" + minor + ".", null);
+        }
+        return bodyFactory.newInstance(major, minor, in, size);
+
+	    
+    }
+	
+    
+	public static VersionSpecificRegistry getVersionSpecificRegistry(ProtocolVersion pv)
+	{
+        return getVersionSpecificRegistry(pv.getMajorVersion(), pv.getMinorVersion());
+    }
+	public static VersionSpecificRegistry getVersionSpecificRegistry(byte major, byte minor)
+	{
+		try
+		{
+			return _specificRegistries[(int)major][(int)minor];
+		}
+		catch (IndexOutOfBoundsException e)
+		{
+			return null;
+		}
+		catch (NullPointerException e)
+		{
+			return null;
+		}
+		
+		
+	}
+    
+	private static VersionSpecificRegistry addVersionSpecificRegistry(byte major, byte minor)
+	{
+		VersionSpecificRegistry[][] registries = _specificRegistries;
+		if(major >= registries.length)
+		{
+			_specificRegistries = new VersionSpecificRegistry[(int)major + 1][];
+			System.arraycopy(registries, 0, _specificRegistries, 0, registries.length);
+			registries = _specificRegistries;
+		}
+		if(registries[major] == null)
+		{
+			registries[major] = new VersionSpecificRegistry[ minor >= DEFAULT_MINOR_VERSION_COUNT ? minor + 1 : DEFAULT_MINOR_VERSION_COUNT ];
+		}
+		else if(registries[major].length <= minor)
+		{
+			VersionSpecificRegistry[] minorArray = registries[major];
+			registries[major] = new VersionSpecificRegistry[ minor + 1 ];
+			System.arraycopy(minorArray, 0, registries[major], 0, minorArray.length);
+			
+		}
+		
+		VersionSpecificRegistry newRegistry = new VersionSpecificRegistry(major,minor);
+		
+		registries[major][minor] = newRegistry;
+		
+		return newRegistry;
+	}
+		
+	private static void registerMethod(short classID, short methodID, byte major, byte minor, AMQMethodBodyInstanceFactory instanceFactory )
+	{
+		VersionSpecificRegistry registry = getVersionSpecificRegistry(major,minor);
+		if(registry == null)
+		{
+			registry = addVersionSpecificRegistry(major,minor);
+			
+		}
+		
+		registry.registerMethod(classID, methodID, instanceFactory);
+        
+    }
+
+	
+    private static void configure()
+    {
+        for(int i = 0 ; i < _specificRegistries.length; i++)
+        {
+            VersionSpecificRegistry[] registries = _specificRegistries[i];
+            if(registries != null)
+            {
+                for(int j = 0 ; j < registries.length; j++)
+                {
+                    VersionSpecificRegistry registry = registries[j];
+                    
+                    if(registry != null)
+                    {
+                        registry.configure();
+                    }
+                }
+            }
+        }
+        
+    }
+    
+}

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java?view=diff&rev=548753&r1=548752&r2=548753
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java Tue Jun 19 07:53:29 2007
@@ -82,7 +82,7 @@
      *
      * @throws Exception on error
      */
-    public void testUntilFailure() throws Exception
+    public void testUntilFailureTransient() throws Exception
     {
         int copies = 0;
         int total = 0;
@@ -102,7 +102,7 @@
      *
      * @throws Exception on error
      */
-    public void testUntilFailureWithDelays() throws Exception
+    public void testUntilFailureWithDelaysTransient() throws Exception
     {
         int copies = 0;
         int total = 0;
@@ -137,7 +137,7 @@
             _logger.info("Running testUntilFailure");
             try
             {
-                he.testUntilFailure();
+                he.testUntilFailureTransient();
             }
             catch (FailoverException fe)
             {
@@ -186,7 +186,7 @@
             _logger.info("Running testUntilFailure");
             try
             {
-                he.testUntilFailureWithDelays();
+                he.testUntilFailureWithDelaysTransient();
             }
             catch (FailoverException fe)
             {