You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/06/15 18:28:54 UTC

svn commit: r547730 [4/9] - in /incubator/qpid/trunk/qpid: ./ java/ java/broker/ java/broker/src/main/java/org/apache/qpid/server/ java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ java/broker/src/main/java/org/apache/qpid/server/queue/ ...

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java?view=diff&rev=547730&r1=547729&r2=547730
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java Fri Jun 15 09:28:46 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.client.handler;
 
-import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.ConnectionTuneParameters;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
@@ -34,9 +33,12 @@
 import org.apache.qpid.framing.ConnectionTuneOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class ConnectionTuneMethodHandler implements StateAwareMethodListener
 {
-    private static final Logger _logger = Logger.getLogger(ConnectionTuneMethodHandler.class);
+    private static final Logger _logger = LoggerFactory.getLogger(ConnectionTuneMethodHandler.class);
 
     private static final ConnectionTuneMethodHandler _instance = new ConnectionTuneMethodHandler();
 
@@ -46,10 +48,10 @@
     }
 
     protected ConnectionTuneMethodHandler()
-    {
-    }
+    { }
 
-    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
+        throws AMQException
     {
         _logger.debug("ConnectionTune frame received");
         ConnectionTuneBody frame = (ConnectionTuneBody) evt.getMethod();
@@ -60,38 +62,36 @@
             params = new ConnectionTuneParameters();
         }
 
-        params.setFrameMax(frame.frameMax);        
+        params.setFrameMax(frame.frameMax);
         params.setChannelMax(frame.channelMax);
         params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat));
         protocolSession.setConnectionTuneParameters(params);
 
         stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
-        protocolSession.writeFrame(createTuneOkFrame(evt.getChannelId(), params,frame.getMajor(), frame.getMinor()));
+        protocolSession.writeFrame(createTuneOkFrame(evt.getChannelId(), params, frame.getMajor(), frame.getMinor()));
 
         String host = protocolSession.getAMQConnection().getVirtualHost();
         AMQShortString virtualHost = new AMQShortString("/" + host);
 
-
-        protocolSession.writeFrame(createConnectionOpenFrame(evt.getChannelId(), virtualHost, null, true,frame.getMajor(), frame.getMinor()));
+        protocolSession.writeFrame(createConnectionOpenFrame(evt.getChannelId(), virtualHost, null, true, frame.getMajor(),
+                frame.getMinor()));
     }
 
-    protected AMQFrame createConnectionOpenFrame(int channel, AMQShortString path, AMQShortString capabilities, boolean insist, byte major, byte minor)
+    protected AMQFrame createConnectionOpenFrame(int channel, AMQShortString path, AMQShortString capabilities,
+        boolean insist, byte major, byte minor)
     {
         // Be aware of possible changes to parameter order as versions change.
-        return ConnectionOpenBody.createAMQFrame(channel,
-            major, minor,	// AMQP version (major, minor)
-            capabilities,	// capabilities
-            insist,	// insist
-            path);	// virtualHost
+        return ConnectionOpenBody.createAMQFrame(channel, major, minor, // AMQP version (major, minor)
+                capabilities, // capabilities
+                insist, // insist
+                path); // virtualHost
     }
 
     protected AMQFrame createTuneOkFrame(int channel, ConnectionTuneParameters params, byte major, byte minor)
     {
         // Be aware of possible changes to parameter order as versions change.
-        return ConnectionTuneOkBody.createAMQFrame(channel,
-            major, minor,
-            params.getChannelMax(),	// channelMax
-            params.getFrameMax(),	// frameMax
-            params.getHeartbeat());	// heartbeat
+        return ConnectionTuneOkBody.createAMQFrame(channel, major, minor, params.getChannelMax(), // channelMax
+                params.getFrameMax(), // frameMax
+                params.getHeartbeat()); // heartbeat
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java?view=diff&rev=547730&r1=547729&r2=547730
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java Fri Jun 15 09:28:46 2007
@@ -17,7 +17,6 @@
  */
 package org.apache.qpid.client.handler;
 
-import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQStateManager;
@@ -25,32 +24,33 @@
 import org.apache.qpid.framing.ExchangeBoundOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * @author Apache Software Foundation
  */
 public class ExchangeBoundOkMethodHandler implements StateAwareMethodListener
 {
-     private static final Logger _logger = Logger.getLogger(ExchangeBoundOkMethodHandler.class);
-     private static final ExchangeBoundOkMethodHandler _instance = new ExchangeBoundOkMethodHandler();
-
-     public static ExchangeBoundOkMethodHandler getInstance()
-     {
-         return _instance;
-     }
-
-     private ExchangeBoundOkMethodHandler()
-     {
-     }
+    private static final Logger _logger = LoggerFactory.getLogger(ExchangeBoundOkMethodHandler.class);
+    private static final ExchangeBoundOkMethodHandler _instance = new ExchangeBoundOkMethodHandler();
 
-     public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
-     {
-         if (_logger.isDebugEnabled())
-         {
+    public static ExchangeBoundOkMethodHandler getInstance()
+    {
+        return _instance;
+    }
+
+    private ExchangeBoundOkMethodHandler()
+    { }
+
+    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
+        throws AMQException
+    {
+        if (_logger.isDebugEnabled())
+        {
             ExchangeBoundOkBody body = (ExchangeBoundOkBody) evt.getMethod();
-            _logger.debug("Received Exchange.Bound-Ok message, response code: " + body.replyCode + " text: " +
-                          body.replyText);
-         }
-     }
+            _logger.debug("Received Exchange.Bound-Ok message, response code: " + body.replyCode + " text: "
+                + body.replyText);
+        }
+    }
 }
-
-

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java?view=diff&rev=547730&r1=547729&r2=547730
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java Fri Jun 15 09:28:46 2007
@@ -17,7 +17,6 @@
  */
 package org.apache.qpid.client.handler;
 
-import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQStateManager;
@@ -25,31 +24,32 @@
 import org.apache.qpid.framing.QueueDeleteOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * @author Apache Software Foundation
  */
 public class QueueDeleteOkMethodHandler implements StateAwareMethodListener
 {
-     private static final Logger _logger = Logger.getLogger(QueueDeleteOkMethodHandler.class);
-     private static final QueueDeleteOkMethodHandler _instance = new QueueDeleteOkMethodHandler();
-
-     public static QueueDeleteOkMethodHandler getInstance()
-     {
-         return _instance;
-     }
-
-     private QueueDeleteOkMethodHandler()
-     {
-     }
+    private static final Logger _logger = LoggerFactory.getLogger(QueueDeleteOkMethodHandler.class);
+    private static final QueueDeleteOkMethodHandler _instance = new QueueDeleteOkMethodHandler();
 
-     public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
-     {
-         if (_logger.isDebugEnabled())
-         {
+    public static QueueDeleteOkMethodHandler getInstance()
+    {
+        return _instance;
+    }
+
+    private QueueDeleteOkMethodHandler()
+    { }
+
+    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
+        throws AMQException
+    {
+        if (_logger.isDebugEnabled())
+        {
             QueueDeleteOkBody body = (QueueDeleteOkBody) evt.getMethod();
             _logger.debug("Received Queue.Delete-Ok message, message count: " + body.messageCount);
-         }
-     }
+        }
+    }
 }
-
-

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=547730&r1=547729&r2=547730
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Fri Jun 15 09:28:46 2007
@@ -20,18 +20,10 @@
  */
 package org.apache.qpid.client.message;
 
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.Map;
-import java.util.UUID;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageNotReadableException;
-import javax.jms.MessageNotWriteableException;
-
 import org.apache.commons.collections.map.ReferenceMap;
+
 import org.apache.mina.common.ByteBuffer;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.*;
 import org.apache.qpid.framing.AMQShortString;
@@ -41,6 +33,16 @@
 import org.apache.qpid.url.BindingURL;
 import org.apache.qpid.url.URLSyntaxException;
 
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
+
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.UUID;
+
 public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message
 {
     private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap());
@@ -70,35 +72,32 @@
         _changedData = (data == null);
         _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
 
-        _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
+        _strictAMQP =
+            Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
     }
 
     protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
-                                 AMQShortString routingKey, ByteBuffer data) throws AMQException
+        AMQShortString routingKey, ByteBuffer data) throws AMQException
     {
         this(contentHeader, deliveryTag);
 
         Integer type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName());
-        int contentType = (type == null) ? AMQDestination.UNKNOWN_TYPE : type.intValue();
 
         AMQDestination dest;
 
-        switch (contentType)
+        if (AMQDestination.QUEUE_TYPE.equals(type))
         {
-
-            case AMQDestination.QUEUE_TYPE:
-                dest = new AMQQueue(exchange, routingKey, routingKey);
-                break;
-
-            case AMQDestination.TOPIC_TYPE:
-                dest = new AMQTopic(exchange, routingKey, null);
-                break;
-
-            default:
-                dest = new AMQUndefinedDestination(exchange, routingKey, null);
-                break;
+            dest = new AMQQueue(exchange, routingKey, routingKey);
+        }
+        else if (AMQDestination.TOPIC_TYPE.equals(type))
+        {
+            dest = new AMQTopic(exchange, routingKey, null);
         }
-        //Destination dest = AMQDestination.createDestination(url);
+        else
+        {
+            dest = new AMQUndefinedDestination(exchange, routingKey, null);
+        }
+        // Destination dest = AMQDestination.createDestination(url);
         setJMSDestination(dest);
 
         _data = data;
@@ -202,7 +201,7 @@
         if (!(destination instanceof AMQDestination))
         {
             throw new IllegalArgumentException(
-                    "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass());
+                "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass());
         }
 
         final AMQDestination amqd = (AMQDestination) destination;
@@ -613,7 +612,6 @@
     {
         getContentHeaderProperties().setHeaders(messageProperties);
     }
-
 
     public JMSHeaderAdapter getJmsHeaders()
     {

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?view=diff&rev=547730&r1=547729&r2=547730
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Fri Jun 15 09:28:46 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,51 +20,53 @@
  */
 package org.apache.qpid.client.message;
 
-import java.util.Iterator;
-import java.util.List;
-
-import javax.jms.JMSException;
-
-import org.apache.log4j.Logger;
 import org.apache.mina.common.ByteBuffer;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+
+import java.util.Iterator;
+import java.util.List;
+
 public abstract class AbstractJMSMessageFactory implements MessageFactory
 {
-    private static final Logger _logger = Logger.getLogger(AbstractJMSMessageFactory.class);
-
+    private static final Logger _logger = LoggerFactory.getLogger(AbstractJMSMessageFactory.class);
 
-    protected abstract AbstractJMSMessage createMessage(long messageNbr, ByteBuffer data,
-                                                        AMQShortString exchange, AMQShortString routingKey,
-                                                        ContentHeaderBody contentHeader) throws AMQException;
+    protected abstract AbstractJMSMessage createMessage(long messageNbr, ByteBuffer data, AMQShortString exchange,
+        AMQShortString routingKey, ContentHeaderBody contentHeader) throws AMQException;
 
-    protected AbstractJMSMessage createMessageWithBody(long messageNbr,
-                                                       ContentHeaderBody contentHeader,
-                                                       AMQShortString exchange, AMQShortString routingKey,
-                                                       List bodies) throws AMQException
+    protected AbstractJMSMessage createMessageWithBody(long messageNbr, ContentHeaderBody contentHeader,
+        AMQShortString exchange, AMQShortString routingKey, List bodies) throws AMQException
     {
         ByteBuffer data;
         final boolean debug = _logger.isDebugEnabled();
 
         // we optimise the non-fragmented case to avoid copying
-        if (bodies != null && bodies.size() == 1)
+        if ((bodies != null) && (bodies.size() == 1))
         {
-            if(debug)
+            if (debug)
             {
-                _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.bodySize +")");
+                _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.bodySize + ")");
             }
-            data = ((ContentBody)bodies.get(0)).payload;
+
+            data = ((ContentBody) bodies.get(0)).payload;
         }
         else if (bodies != null)
         {
-            if(debug)
+            if (debug)
             {
-                _logger.debug("Fragmented message body (" + bodies.size() + " frames, bodySize=" + contentHeader.bodySize + ")");
+                _logger.debug("Fragmented message body (" + bodies.size() + " frames, bodySize=" + contentHeader.bodySize
+                    + ")");
             }
-            data = ByteBuffer.allocate((int)contentHeader.bodySize); // XXX: Is cast a problem?
+
+            data = ByteBuffer.allocate((int) contentHeader.bodySize); // XXX: Is cast a problem?
             final Iterator it = bodies.iterator();
             while (it.hasNext())
             {
@@ -72,27 +74,29 @@
                 data.put(cb.payload);
                 cb.payload.release();
             }
+
             data.flip();
         }
         else // bodies == null
         {
             data = ByteBuffer.allocate(0);
         }
-        if(debug)
+
+        if (debug)
         {
-            _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data.remaining());
+            _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining="
+                + data.remaining());
         }
 
         return createMessage(messageNbr, data, exchange, routingKey, contentHeader);
     }
 
-    public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered,
-                                            ContentHeaderBody contentHeader,
-                                            AMQShortString exchange, AMQShortString routingKey,
-                                            List bodies) throws JMSException, AMQException
+    public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, ContentHeaderBody contentHeader,
+        AMQShortString exchange, AMQShortString routingKey, List bodies) throws JMSException, AMQException
     {
         final AbstractJMSMessage msg = createMessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies);
         msg.setJMSRedelivered(redelivered);
+
         return msg;
     }
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java?view=diff&rev=547730&r1=547729&r2=547730
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java Fri Jun 15 09:28:46 2007
@@ -14,36 +14,38 @@
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License.    
+ *  under the License.
+ *
  *
- * 
  */
 package org.apache.qpid.client.message;
 
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+
 import java.nio.charset.CharacterCodingException;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Map;
 
-import javax.jms.JMSException;
-import javax.jms.MessageFormatException;
-
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
-
 public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jms.MapMessage
 {
-    private static final Logger _logger = Logger.getLogger(JMSMapMessage.class);
-
+    private static final Logger _logger = LoggerFactory.getLogger(JMSMapMessage.class);
 
     public static final String MIME_TYPE = "jms/map-message";
     private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
 
-    private Map<String,Object> _map = new HashMap<String, Object>();
+    private Map<String, Object> _map = new HashMap<String, Object>();
 
     public JMSMapMessage() throws JMSException
     {
@@ -56,24 +58,22 @@
         populateMapFromData();
     }
 
-
-    JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
-                  AMQShortString routingKey, ByteBuffer data) throws AMQException
+    JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey,
+        ByteBuffer data) throws AMQException
     {
         super(messageNbr, contentHeader, exchange, routingKey, data);
         try
         {
             populateMapFromData();
-        }                                                        
+        }
         catch (JMSException je)
         {
             throw new AMQException(null, "Error populating MapMessage from ByteBuffer", je);
-            
+
         }
 
     }
 
-
     public String toBodyString() throws JMSException
     {
         return _map.toString();
@@ -84,16 +84,14 @@
         return MIME_TYPE_SHORT_STRING;
     }
 
-
     public ByteBuffer getData()
     {
-        //What if _data is null?
+        // What if _data is null?
         writeMapToData();
+
         return super.getData();
     }
 
-
-
     @Override
     public void clearBodyImpl() throws JMSException
     {
@@ -105,18 +103,18 @@
     {
         Object value = _map.get(propName);
 
-        if(value instanceof Boolean)
+        if (value instanceof Boolean)
         {
-            return ((Boolean)value).booleanValue();
+            return ((Boolean) value).booleanValue();
         }
-        else if((value instanceof String) || (value == null))
+        else if ((value instanceof String) || (value == null))
         {
-            return Boolean.valueOf((String)value);
+            return Boolean.valueOf((String) value);
         }
         else
         {
-            throw new MessageFormatException("Property " + propName + " of type " +
-                                             value.getClass().getName() + " cannot be converted to boolean.");
+            throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+                + " cannot be converted to boolean.");
         }
 
     }
@@ -125,18 +123,18 @@
     {
         Object value = _map.get(propName);
 
-        if(value instanceof Byte)
+        if (value instanceof Byte)
         {
-            return ((Byte)value).byteValue();
+            return ((Byte) value).byteValue();
         }
-        else if((value instanceof String) || (value==null))
+        else if ((value instanceof String) || (value == null))
         {
-            return Byte.valueOf((String)value).byteValue();
+            return Byte.valueOf((String) value).byteValue();
         }
         else
         {
-            throw new MessageFormatException("Property " + propName + " of type " +
-                                             value.getClass().getName() + " cannot be converted to byte.");
+            throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+                + " cannot be converted to byte.");
         }
     }
 
@@ -144,51 +142,50 @@
     {
         Object value = _map.get(propName);
 
-        if(value instanceof Short)
+        if (value instanceof Short)
         {
-            return ((Short)value).shortValue();
+            return ((Short) value).shortValue();
         }
-        else if(value instanceof Byte)
+        else if (value instanceof Byte)
         {
-            return ((Byte)value).shortValue();
+            return ((Byte) value).shortValue();
         }
-        else if((value instanceof String) || (value==null))
+        else if ((value instanceof String) || (value == null))
         {
-            return Short.valueOf((String)value).shortValue();
+            return Short.valueOf((String) value).shortValue();
         }
         else
         {
-            throw new MessageFormatException("Property " + propName + " of type " +
-                                             value.getClass().getName() + " cannot be converted to short.");
+            throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+                + " cannot be converted to short.");
         }
 
     }
 
-
     public int getInt(String propName) throws JMSException
     {
         Object value = _map.get(propName);
 
-        if(value instanceof Integer)
+        if (value instanceof Integer)
         {
-            return ((Integer)value).intValue();
+            return ((Integer) value).intValue();
         }
-        else if(value instanceof Short)
+        else if (value instanceof Short)
         {
-            return ((Short)value).intValue();
+            return ((Short) value).intValue();
         }
-        else if(value instanceof Byte)
+        else if (value instanceof Byte)
         {
-            return ((Byte)value).intValue();
+            return ((Byte) value).intValue();
         }
-        else if((value instanceof String) || (value==null))
+        else if ((value instanceof String) || (value == null))
         {
-            return Integer.valueOf((String)value).intValue();
+            return Integer.valueOf((String) value).intValue();
         }
         else
         {
-            throw new MessageFormatException("Property " + propName + " of type " +
-                                             value.getClass().getName() + " cannot be converted to int.");
+            throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+                + " cannot be converted to int.");
         }
 
     }
@@ -197,30 +194,32 @@
     {
         Object value = _map.get(propName);
 
-        if(value instanceof Long)
+        if (value instanceof Long)
         {
-            return ((Long)value).longValue();
+            return ((Long) value).longValue();
         }
-        else if(value instanceof Integer)
+        else if (value instanceof Integer)
         {
-            return ((Integer)value).longValue();
+            return ((Integer) value).longValue();
         }
-        if(value instanceof Short)
+
+        if (value instanceof Short)
         {
-            return ((Short)value).longValue();
+            return ((Short) value).longValue();
         }
-        if(value instanceof Byte)
+
+        if (value instanceof Byte)
         {
-            return ((Byte)value).longValue();
+            return ((Byte) value).longValue();
         }
-        else if((value instanceof String) || (value==null))
+        else if ((value instanceof String) || (value == null))
         {
-            return Long.valueOf((String)value).longValue();
+            return Long.valueOf((String) value).longValue();
         }
         else
         {
-            throw new MessageFormatException("Property " + propName + " of type " +
-                                             value.getClass().getName() + " cannot be converted to long.");
+            throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+                + " cannot be converted to long.");
         }
 
     }
@@ -229,45 +228,43 @@
     {
         Object value = _map.get(propName);
 
-        if(!_map.containsKey(propName))
+        if (!_map.containsKey(propName))
         {
             throw new MessageFormatException("Property " + propName + " not present");
         }
-        else if(value instanceof Character)
+        else if (value instanceof Character)
         {
-            return ((Character)value).charValue();
+            return ((Character) value).charValue();
         }
         else if (value == null)
         {
-            throw new NullPointerException("Property " + propName + " has null value and therefore cannot " +
-                                           "be converted to char.");
+            throw new NullPointerException("Property " + propName + " has null value and therefore cannot "
+                + "be converted to char.");
         }
         else
         {
-            throw new MessageFormatException("Property " + propName + " of type " +
-                                             value.getClass().getName() + " cannot be converted to boolan.");
+            throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+                + " cannot be converted to boolan.");
         }
 
     }
 
-
-
     public float getFloat(String propName) throws JMSException
     {
         Object value = _map.get(propName);
 
-        if(value instanceof Float)
+        if (value instanceof Float)
         {
-            return ((Float)value).floatValue();
+            return ((Float) value).floatValue();
         }
-        else if((value instanceof String) || (value==null))
+        else if ((value instanceof String) || (value == null))
         {
-            return Float.valueOf((String)value).floatValue();
+            return Float.valueOf((String) value).floatValue();
         }
         else
         {
-            throw new MessageFormatException("Property " + propName + " of type " +
-                                             value.getClass().getName() + " cannot be converted to float.");
+            throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+                + " cannot be converted to float.");
         }
     }
 
@@ -275,22 +272,22 @@
     {
         Object value = _map.get(propName);
 
-        if(value instanceof Double)
+        if (value instanceof Double)
         {
-            return ((Double)value).doubleValue();
+            return ((Double) value).doubleValue();
         }
-        else if(value instanceof Float)
+        else if (value instanceof Float)
         {
-            return ((Float)value).doubleValue();
+            return ((Float) value).doubleValue();
         }
-        else if((value instanceof String) || (value==null))
+        else if ((value instanceof String) || (value == null))
         {
-            return Double.valueOf((String)value).doubleValue();
+            return Double.valueOf((String) value).doubleValue();
         }
         else
         {
-            throw new MessageFormatException("Property " + propName + " of type " +
-                                             value.getClass().getName() + " cannot be converted to double.");
+            throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+                + " cannot be converted to double.");
         }
     }
 
@@ -298,14 +295,13 @@
     {
         Object value = _map.get(propName);
 
-        if((value instanceof String) || (value == null))
+        if ((value instanceof String) || (value == null))
         {
             return (String) value;
         }
-        else if(value instanceof byte[])
+        else if (value instanceof byte[])
         {
-            throw new MessageFormatException("Property " + propName + " of type byte[] " +
-                                             "cannot be converted to String.");
+            throw new MessageFormatException("Property " + propName + " of type byte[] " + "cannot be converted to String.");
         }
         else
         {
@@ -318,18 +314,18 @@
     {
         Object value = _map.get(propName);
 
-        if(!_map.containsKey(propName))
+        if (!_map.containsKey(propName))
         {
-            throw new MessageFormatException("Property " + propName + " not present");                        
+            throw new MessageFormatException("Property " + propName + " not present");
         }
-        else if((value instanceof byte[]) || (value == null))
+        else if ((value instanceof byte[]) || (value == null))
         {
-            return (byte[])value;
+            return (byte[]) value;
         }
         else
         {
-            throw new MessageFormatException("Property " + propName + " of type " +
-                                             value.getClass().getName() + " cannot be converted to byte[].");
+            throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+                + " cannot be converted to byte[].");
         }
     }
 
@@ -343,7 +339,6 @@
         return Collections.enumeration(_map.keySet());
     }
 
-
     public void setBoolean(String propName, boolean b) throws JMSException
     {
         checkWritable();
@@ -416,46 +411,38 @@
 
     public void setBytes(String propName, byte[] bytes, int offset, int length) throws JMSException
     {
-        if((offset == 0) && (length == bytes.length))
+        if ((offset == 0) && (length == bytes.length))
         {
-            setBytes(propName,bytes);
+            setBytes(propName, bytes);
         }
         else
         {
             byte[] newBytes = new byte[length];
-            System.arraycopy(bytes,offset,newBytes,0,length);
-            setBytes(propName,newBytes);
+            System.arraycopy(bytes, offset, newBytes, 0, length);
+            setBytes(propName, newBytes);
         }
     }
 
     public void setObject(String propName, Object value) throws JMSException
-    {                                                                                       
+    {
         checkWritable();
         checkPropertyName(propName);
-        if(value instanceof Boolean
-                || value instanceof Byte
-                || value instanceof Short
-                || value instanceof Integer
-                || value instanceof Long
-                || value instanceof Character
-                || value instanceof Float
-                || value instanceof Double
-                || value instanceof String
-                || value instanceof byte[]                
-                || value == null)
+        if ((value instanceof Boolean) || (value instanceof Byte) || (value instanceof Short) || (value instanceof Integer)
+                || (value instanceof Long) || (value instanceof Character) || (value instanceof Float)
+                || (value instanceof Double) || (value instanceof String) || (value instanceof byte[]) || (value == null))
         {
             _map.put(propName, value);
         }
         else
         {
-            throw new MessageFormatException("Cannot set property " + propName + " to value " + value +
-                                             "of type " + value.getClass().getName() + ".");
+            throw new MessageFormatException("Cannot set property " + propName + " to value " + value + "of type "
+                + value.getClass().getName() + ".");
         }
     }
 
     private void checkPropertyName(String propName)
     {
-        if(propName == null || propName.equals(""))
+        if ((propName == null) || propName.equals(""))
         {
             throw new IllegalArgumentException("Property name cannot be null, or the empty String.");
         }
@@ -466,19 +453,18 @@
         return _map.containsKey(propName);
     }
 
-
     private void populateMapFromData() throws JMSException
     {
-        if(_data != null)
+        if (_data != null)
         {
             _data.rewind();
 
             final int entries = readIntImpl();
-            for(int i = 0; i < entries; i++)
+            for (int i = 0; i < entries; i++)
             {
                 String propName = readStringImpl();
                 Object value = readObject();
-                _map.put(propName,value);
+                _map.put(propName, value);
             }
         }
         else
@@ -492,7 +478,7 @@
         allocateInitialBuffer();
         final int size = _map.size();
         writeIntImpl(size);
-        for(Map.Entry<String, Object> entry : _map.entrySet())
+        for (Map.Entry<String, Object> entry : _map.entrySet())
         {
             try
             {
@@ -500,10 +486,10 @@
             }
             catch (CharacterCodingException e)
             {
-                throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey(),e);
-
+                throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey(), e);
 
             }
+
             try
             {
                 writeObject(entry.getValue());
@@ -511,14 +497,11 @@
             catch (JMSException e)
             {
                 Object value = entry.getValue();
-                throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey() +
-                        " value : " + value + " (type: " + value.getClass().getName() + ").",e);
+                throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey() + " value : " + value
+                    + " (type: " + value.getClass().getName() + ").", e);
             }
         }
 
     }
-
-
-
 
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java?view=diff&rev=547730&r1=547729&r2=547730
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java Fri Jun 15 09:28:46 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,7 +20,8 @@
  */
 package org.apache.qpid.client.message;
 
-import java.util.Enumeration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.jms.BytesMessage;
 import javax.jms.JMSException;
@@ -31,7 +32,7 @@
 import javax.jms.StreamMessage;
 import javax.jms.TextMessage;
 
-import org.apache.log4j.Logger;
+import java.util.Enumeration;
 
 public class MessageConverter
 {
@@ -39,7 +40,7 @@
     /**
      * Log4J logger
      */
-    protected final Logger _logger = Logger.getLogger(getClass());
+    protected final Logger _logger = LoggerFactory.getLogger(getClass());
 
     /**
      * AbstractJMSMessage which will hold the converted message
@@ -81,6 +82,7 @@
             String name = (String) mapNames.nextElement();
             nativeMessage.setObject(name, message.getObject(name));
         }
+
         _newMessage = (AbstractJMSMessage) nativeMessage;
         setMessageProperties(message);
     }
@@ -121,15 +123,16 @@
         }
         catch (MessageEOFException e)
         {
-            //we're at the end so don't mind the exception
+            // we're at the end so don't mind the exception
         }
+
         _newMessage = (AbstractJMSMessage) nativeMessage;
         setMessageProperties(message);
     }
 
     public MessageConverter(Message message) throws JMSException
     {
-        //Send a message with just properties.
+        // Send a message with just properties.
         // Throwing away content
         BytesMessage nativeMessage = new JMSBytesMessage();
 
@@ -160,7 +163,7 @@
         while (propertyNames.hasMoreElements())
         {
             String propertyName = String.valueOf(propertyNames.nextElement());
-            //TODO: Shouldn't need to check for JMS properties here as don't think getPropertyNames() should return them
+            // TODO: Shouldn't need to check for JMS properties here as don't think getPropertyNames() should return them
             if (!propertyName.startsWith("JMSX_"))
             {
                 Object value = message.getObjectProperty(propertyName);
@@ -190,6 +193,7 @@
         {
             _newMessage.setJMSReplyTo(message.getJMSReplyTo());
         }
+
         _newMessage.setJMSType(message.getJMSType());
 
         _newMessage.setJMSCorrelationID(message.getJMSCorrelationID());

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=547730&r1=547729&r2=547730
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Fri Jun 15 09:28:46 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,10 +20,6 @@
  */
 package org.apache.qpid.client.protocol;
 
-import java.util.Iterator;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-
 import org.apache.log4j.Logger;
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoHandlerAdapter;
@@ -34,10 +30,10 @@
 import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQTimeoutException;
-import org.apache.qpid.AMQChannelClosedException;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.SSLConfiguration;
+import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverHandler;
 import org.apache.qpid.client.failover.FailoverState;
 import org.apache.qpid.client.state.AMQState;
@@ -60,9 +56,67 @@
 import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.ssl.SSLContextFactory;
 
+import java.util.Iterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
 
+/**
+ * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the
+ * network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the
+ * specific event model of AMQP, by revealing the type of the received events (from decoded data), and passing the event
+ * on to more specific handlers for the type. In this sense, it channels the richer event model of AMQP, expressed in
+ * terms of methods and so on, through the cruder, general purpose event model of MINA, expressed in terms of "message
+ * received" and so on.
+ *
+ * <p/>There is a 1:1 mapping between an AMQProtocolHandler and an {@link AMQConnection}. The connection class is
+ * exposed to the end user of the AMQP client API, and also implements the JMS Connection API, so provides the public
+ * API calls through which an individual connection can be manipulated. This protocol handler talks to the network
+ * through MINA, in a behind the scenes role; it is not an exposed part of the client API.
+ *
+ * <p/>There is a 1:many mapping between an AMQProtocolHandler and a set of {@link AMQSession}s. At the MINA level,
+ * there is one session per connection. At the AMQP level there can be many channels which are also called sessions in
+ * JMS parlance. The {@link AMQSession}s are managed through an {@link AMQProtocolSession} instance. The protocol
+ * session is similar to the MINA per-connection session, except that it can span the lifecycle of multiple MINA
+ * sessions in the event of failover. See below for more information about this.
+ *
+ * <p/>Mina provides a session container that can be used to store/retrieve arbitrary objects as String named
+ * attributes. A more convenient, type-safe, container for session data is provided in the form of {@link
+ * AMQProtocolSession}.
+ *
+ * <p/>A common way to use MINA is to have a single instance of the event handler, and for MINA to pass in its session
+ * object with every event, and for per-connection data to be held in the MINA session (perhaps using a type-safe
+ * wrapper as described above). This event handler is different, because dealing with failover complicates things. To
+ * the end client of an AMQConnection, a failed over connection is still handled through the same connection instance,
+ * but behind the scenes a new transport connection, and MINA session will have been created. The MINA session object
+ * cannot be used to track the state of the fail-over process, because it is destroyed and a new one is created, as the
+ * old connection is shutdown and a new one created. For this reason, an AMQProtocolHandler is created per AMQConnection
+ * and the protocol session data is held outside of the MINA IOSession.
+ *
+ * <p/>This handler is responsibile for setting up the filter chain to filter all events for this handler through. The
+ * filter chain is set up as a stack of event handers that perform the following functions (working upwards from the
+ * network traffic at the bottom), handing off incoming events to an asynchronous thread pool to do the work, optionally
+ * handling secure sockets encoding/decoding, encoding/decoding the AMQP format itself.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Create the
+ * filter chain to filter this handlers events. <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link
+ * SSLFilter}, {@link ReadWriteThreadModel}.
+ *
+ * <tr><td> Maintain fail-over state. <tr><td> </table>
+ *
+ * @todo Explain the system property: amqj.shared_read_write_pool. How does putting the protocol codec filter before the
+ * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing
+ * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec filter
+ * before it mean not doing the read/write asynchronously but in the main filter thread?
+ * @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including
+ * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of AMQProtocolSesssion
+ * and AMQConnection will be the same, so if there is high cohesion between them, they could be merged, although there
+ * is sense in keeping the session model seperate. Will clarify things by having data held per protocol handler, per
+ * protocol session, per network connection, per channel, in seperate classes, so that lifecycles of the fields match
+ * lifecycles of their containing objects.
+ */
 public class AMQProtocolHandler extends IoHandlerAdapter
 {
+    /** Used for debugging. */
     private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class);
 
     /**
@@ -74,8 +128,10 @@
     /** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */
     private volatile AMQProtocolSession _protocolSession;
 
+    /** Holds the state of the protocol session. */
     private AMQStateManager _stateManager = new AMQStateManager();
 
+    /** Holds the method listeners, */
     private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet();
 
     /**
@@ -91,15 +147,31 @@
      */
     private FailoverState _failoverState = FailoverState.NOT_STARTED;
 
+    /** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */
     private CountDownLatch _failoverLatch;
 
+    /** Defines the default timeout to use for synchronous protocol commands. */
     private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
 
+    /**
+     * Creates a new protocol handler, associated with the specified client connection instance.
+     *
+     * @param con The client connection that this is the event handler for.
+     */
     public AMQProtocolHandler(AMQConnection con)
     {
         _connection = con;
     }
 
+    /**
+     * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the
+     * session, which filters the events handled by this handler. The filter chain consists of, handing off events to an
+     * asynchronous thread pool, optionally encoding/decoding ssl, encoding/decoding AMQP.
+     *
+     * @param session The MINA session.
+     *
+     * @throws Exception Any underlying exceptions are allowed to fall through to MINA.
+     */
     public void sessionCreated(IoSession session) throws Exception
     {
         _logger.debug("Protocol session created for session " + System.identityHashCode(session));
@@ -119,16 +191,15 @@
         if (_connection.getSSLConfiguration() != null)
         {
             SSLConfiguration sslConfig = _connection.getSSLConfiguration();
-            SSLContextFactory sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
+            SSLContextFactory sslFactory =
+                    new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
             SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext());
             sslFilter.setUseClientMode(true);
             session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
         }
 
-
         try
         {
-
             ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
             threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
             threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
@@ -142,35 +213,38 @@
         _protocolSession.init();
     }
 
-    public void sessionOpened(IoSession session) throws Exception
-    {
-        //System.setProperty("foo", "bar");
-    }
-
     /**
-     * When the broker connection dies we can either get sessionClosed() called or exceptionCaught() followed by
-     * sessionClosed() depending on whether we were trying to send data at the time of failure.
+     * Called when the network connection is closed. This can happen, either because the client explicitly requested
+     * that the connection be closed, in which case nothing is done, or because the connection died. In the case where
+     * the connection died, an attempt to failover automatically to a new connection may be started. The failover
+     * process will be started, provided that it is the clients policy to allow failover, and provided that a failover
+     * has not already been started or failed.
+     *
+     * <p/>It is important to note that when the connection dies this method may be called or {@link #exceptionCaught}
+     * may be called first followed by this method. This depends on whether the client was trying to send data at the
+     * time of the failure.
      *
-     * @param session
+     * @param session The MINA session.
      *
-     * @throws Exception
+     * @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and
+     * not otherwise? The above comment doesn't make that clear.
      */
-    public void sessionClosed(IoSession session) throws Exception
+    public void sessionClosed(IoSession session)
     {
         if (_connection.isClosed())
         {
-            _logger.info("Session closed called by client");
+            _logger.debug("Session closed called by client");
         }
         else
         {
-            _logger.info("Session closed called with failover state currently " + _failoverState);
+            _logger.debug("Session closed called with failover state currently " + _failoverState);
 
-            //reconnetablility was introduced here so as not to disturb the client as they have made their intentions
+            // reconnetablility was introduced here so as not to disturb the client as they have made their intentions
             // known through the policy settings.
 
             if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed())
             {
-                _logger.info("FAILOVER STARTING");
+                _logger.debug("FAILOVER STARTING");
                 if (_failoverState == FailoverState.NOT_STARTED)
                 {
                     _failoverState = FailoverState.IN_PROGRESS;
@@ -178,12 +252,12 @@
                 }
                 else
                 {
-                    _logger.info("Not starting failover as state currently " + _failoverState);
+                    _logger.debug("Not starting failover as state currently " + _failoverState);
                 }
             }
             else
             {
-                _logger.info("Failover not allowed by policy.");
+                _logger.debug("Failover not allowed by policy."); // or already in progress?
 
                 if (_logger.isDebugEnabled())
                 {
@@ -199,12 +273,12 @@
                 }
                 else
                 {
-                    _logger.info("sessionClose() failover in progress");
+                    _logger.debug("sessionClose() failover in progress");
                 }
             }
         }
 
-        _logger.info("Protocol Session [" + this + "] closed");
+        _logger.debug("Protocol Session [" + this + "] closed");
     }
 
     /** See {@link FailoverHandler} to see rationale for separate thread. */
@@ -223,25 +297,32 @@
         _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + status);
         if (IdleStatus.WRITER_IDLE.equals(status))
         {
-            //write heartbeat frame:
+            // write heartbeat frame:
             _logger.debug("Sent heartbeat");
             session.write(HeartbeatBody.FRAME);
             HeartbeatDiagnostics.sent();
         }
         else if (IdleStatus.READER_IDLE.equals(status))
         {
-            //failover:
+            // failover:
             HeartbeatDiagnostics.timeout();
             _logger.warn("Timed out while waiting for heartbeat from peer.");
             session.close();
         }
     }
 
-    public void exceptionCaught(IoSession session, Throwable cause) throws Exception
+    /**
+     * Invoked when any exception is thrown by a user IoHandler implementation or by MINA. If the cause is an
+     * IOException, MINA will close the connection automatically.
+     *
+     * @param session The MINA session.
+     * @param cause   The exception that triggered this event.
+     */
+    public void exceptionCaught(IoSession session, Throwable cause)
     {
         if (_failoverState == FailoverState.NOT_STARTED)
         {
-            //if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException)))
+            // if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException)))
             if (cause instanceof AMQConnectionClosedException)
             {
                 _logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
@@ -250,8 +331,8 @@
                 sessionClosed(session);
             }
 
-            //FIXME Need to correctly handle other exceptions. Things like ...
-//            if (cause instanceof AMQChannelClosedException)
+            // FIXME Need to correctly handle other exceptions. Things like ...
+            // if (cause instanceof AMQChannelClosedException)
             // which will cause the JMSSession to end due to a channel close and so that Session needs
             // to be removed from the map so we can correctly still call close without an exception when trying to close
             // the server closed session.  See also CloseChannelMethodHandler as the sessionClose is never called on exception
@@ -261,6 +342,7 @@
         else if (_failoverState == FailoverState.FAILED)
         {
             _logger.error("Exception caught by protocol handler: " + cause, cause);
+
             // we notify the state manager of the error in case we have any clients waiting on a state
             // change. Those "waiters" will be interrupted and can handle the exception
             AMQException amqe = new AMQException(null, "Protocol handler error: " + cause, cause);
@@ -297,7 +379,7 @@
         final boolean debug = _logger.isDebugEnabled();
         final long msgNumber = ++_messageReceivedCount;
 
-        if (debug && (msgNumber % 1000 == 0))
+        if (debug && ((msgNumber % 1000) == 0))
         {
             _logger.debug("Received " + _messageReceivedCount + " protocol messages");
         }
@@ -317,7 +399,8 @@
                     _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
                 }
 
-                final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
+                final AMQMethodEvent<AMQMethodBody> evt =
+                        new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
 
                 try
                 {
@@ -331,10 +414,16 @@
                             final AMQMethodListener listener = (AMQMethodListener) it.next();
                             wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
                         }
+                        if (!wasAnyoneInterested)
+                        {
+                            throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener.  Listeners:" + _frameListeners, null);
+                        }
                     }
+
                     if (!wasAnyoneInterested)
                     {
-                        throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener.  Listeners:" + _frameListeners, null);
+                        throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener.  Listeners:"
+                                                     + _frameListeners, null);
                     }
                 }
                 catch (AMQException e)
@@ -349,20 +438,20 @@
                             listener.error(e);
                         }
                     }
+
                     exceptionCaught(session, e);
                 }
+
                 break;
 
             case ContentHeaderBody.TYPE:
 
-                _protocolSession.messageContentHeaderReceived(frame.getChannel(),
-                                                              (ContentHeaderBody) bodyFrame);
+                _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame);
                 break;
 
             case ContentBody.TYPE:
 
-                _protocolSession.messageContentBodyReceived(frame.getChannel(),
-                                                            (ContentBody) bodyFrame);
+                _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame);
                 break;
 
             case HeartbeatBody.TYPE:
@@ -371,11 +460,13 @@
                 {
                     _logger.debug("Received heartbeat");
                 }
+
                 break;
 
             default:
 
         }
+
         _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
     }
 
@@ -387,10 +478,11 @@
 
         final boolean debug = _logger.isDebugEnabled();
 
-        if (debug && (sentMessages % 1000 == 0))
+        if (debug && ((sentMessages % 1000) == 0))
         {
             _logger.debug("Sent " + _messagesOut + " protocol messages");
         }
+
         _connection.bytesSent(session.getWrittenBytes());
         if (debug)
         {
@@ -408,7 +500,7 @@
       {
           _frameListeners.remove(listener);
       }
-    */
+     */
     public void attainState(AMQState s) throws AMQException
     {
         getStateManager().attainState(s);
@@ -437,9 +529,8 @@
      * @param frame
      * @param listener the blocking listener. Note the calling thread will block.
      */
-    public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
-                                                           BlockingMethodFrameListener listener)
-            throws AMQException
+    public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener)
+            throws AMQException, FailoverException
     {
         return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT);
     }
@@ -451,9 +542,8 @@
      * @param frame
      * @param listener the blocking listener. Note the calling thread will block.
      */
-    public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
-                                                           BlockingMethodFrameListener listener, long timeout)
-            throws AMQException
+    public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener,
+                                                           long timeout) throws AMQException, FailoverException
     {
         try
         {
@@ -461,6 +551,7 @@
             _protocolSession.writeFrame(frame);
 
             AMQMethodEvent e = listener.blockForFrame(timeout);
+
             return e;
             // When control resumes before this line, a reply will have been received
             // that matches the criteria defined in the blocking listener
@@ -478,25 +569,33 @@
     }
 
     /** More convenient method to write a frame and wait for it's response. */
-    public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException
+    public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException, FailoverException
     {
         return syncWrite(frame, responseClass, DEFAULT_SYNC_TIMEOUT);
     }
 
     /** More convenient method to write a frame and wait for it's response. */
-    public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException
+    public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException, FailoverException
     {
-        return writeCommandFrameAndWaitForReply(frame,
-                                                new SpecificMethodFrameListener(frame.getChannel(), responseClass), timeout);
+        return writeCommandFrameAndWaitForReply(frame, new SpecificMethodFrameListener(frame.getChannel(), responseClass),
+                                                timeout);
     }
 
-
-
     public void closeSession(AMQSession session) throws AMQException
     {
         _protocolSession.closeSession(session);
     }
 
+    /**
+     * Closes the connection.
+     *
+     * <p/>If a failover exception occurs whilst closing the connection it is ignored, as the connection is closed
+     * anyway.
+     *
+     * @param timeout The timeout to wait for an acknowledgement to the close request.
+     *
+     * @throws AMQException If the close fails for any reason.
+     */
     public void closeConnection(long timeout) throws AMQException
     {
         getStateManager().changeState(AMQState.CONNECTION_CLOSING);
@@ -504,13 +603,13 @@
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
-        final AMQFrame frame = ConnectionCloseBody.createAMQFrame(0,
-                                                                  _protocolSession.getProtocolMajorVersion(),
-                                                                  _protocolSession.getProtocolMinorVersion(),    // AMQP version (major, minor)
-                                                                  0,    // classId
-                                                                  0,    // methodId
-                                                                  AMQConstant.REPLY_SUCCESS.getCode(),    // replyCode
-                                                                  new AMQShortString("JMS client is closing the connection."));    // replyText
+        final AMQFrame frame =
+                ConnectionCloseBody.createAMQFrame(0, _protocolSession.getProtocolMajorVersion(),
+                                                   _protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor)
+                                                   0, // classId
+                                                   0, // methodId
+                                                   AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+                                                   new AMQShortString("JMS client is closing the connection.")); // replyText
 
         try
         {
@@ -521,8 +620,10 @@
         {
             _protocolSession.closeProtocolSession(false);
         }
-
-
+        catch (FailoverException e)
+        {
+            _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway.");
+        }
     }
 
     /** @return the number of bytes read from this protocol session */
@@ -603,7 +704,6 @@
     {
         return _protocolSession.getProtocolMajorVersion();
     }
-
 
     public byte getProtocolMinorVersion()
     {

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=547730&r1=547729&r2=547730
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Fri Jun 15 09:28:46 2007
@@ -20,16 +20,8 @@
  */
 package org.apache.qpid.client.protocol;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import javax.jms.JMSException;
-import javax.security.sasl.SaslClient;
-
 import org.apache.commons.lang.StringUtils;
 
-import org.apache.log4j.Logger;
-
 import org.apache.mina.common.CloseFuture;
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoSession;
@@ -53,16 +45,24 @@
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.security.sasl.SaslClient;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
 /**
  * Wrapper for protocol session that provides type-safe access to session attributes. <p/> The underlying protocol
  * session is still available but clients should not use it to obtain session attributes.
  */
 public class AMQProtocolSession implements AMQVersionAwareProtocolSession
 {
-
     protected static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2;
 
-    protected static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
+    protected static final Logger _logger = LoggerFactory.getLogger(AMQProtocolSession.class);
 
     public static final String PROTOCOL_INITIATION_RECEIVED = "ProtocolInitiatiionReceived";
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java?view=diff&rev=547730&r1=547729&r2=547730
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java Fri Jun 15 09:28:46 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,71 +27,137 @@
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
 
+/**
+ * BlockingMethodFrameListener is a 'rendezvous' which acts as a {@link AMQMethodListener} that delegates handling of
+ * incoming methods to a method listener implemented as a sub-class of this and hands off the processed method or
+ * error to a consumer. The producer of the event does not have to wait for the consumer to take the event, so this
+ * differs from a 'rendezvous' in that sense.
+ *
+ * <p/>BlockingMethodFrameListeners are used to coordinate waiting for replies to method calls that expect a response.
+ * They are always used in a 'one-shot' manner, that is, to recieve just one response. Usually the caller has to register
+ * them as method listeners with an event dispatcher and remember to de-register them (in a finally block) once they
+ * have been completed.
+ *
+ * <p/>The {@link #processMethod} must return <tt>true</tt> on any incoming method that it handles. This indicates to
+ * this listeners that the method it is waiting for has arrived. Incoming methods are also filtered by channel prior to
+ * being passed to the {@link #processMethod} method, so responses are only received for a particular channel. The
+ * channel id must be passed to the constructor.
+ *
+ * <p/>Errors from the producer are rethrown to the consumer.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Accept notification of AMQP method events. <td> {@link AMQMethodEvent}
+ * <tr><td> Delegate handling of the method to another method listener. <td> {@link AMQMethodBody}
+ * <tr><td> Block until a method is handled by the delegated to handler.
+ * <tr><td> Propagate the most recent exception to the consumer.
+ * </table>
+ *
+ * @todo Might be neater if this method listener simply wrapped another that provided the method handling using a
+ *       methodRecevied method. The processMethod takes an additional channelId, however none of the implementations
+ *       seem to use it. So wrapping the listeners is possible.
+ *
+ * @todo What is to stop a blocking method listener, receiving a second method whilst it is registered as a listener,
+ *       overwriting the first one before the caller of the block method has had a chance to examine it? If one-shot
+ *       behaviour is to be intended it should be enforced, perhaps by always returning false once the blocked for
+ *       method has been received.
+ *
+ * @todo Interuption is caught but not handled. This could be allowed to fall through. This might actually be usefull
+ *       for fail-over where a thread is blocking when failure happens, it could be interrupted to abandon or retry
+ *       when this happens. At the very least, restore the interrupted status flag.
+ *
+ * @todo If the retrotranslator can handle it, could use a SynchronousQueue to implement this rendezvous. Need to
+ *       check that SynchronousQueue has a non-blocking put method available.
+ */
 public abstract class BlockingMethodFrameListener implements AMQMethodListener
 {
+    /** This flag is used to indicate that the blocked for method has been received. */
     private volatile boolean _ready = false;
 
-    public abstract boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException;
-
+    /** Used to protect the shared event and ready flag between the producer and consumer. */
     private final Object _lock = new Object();
 
-    /**
-     * This is set if there is an exception thrown from processCommandFrame and the
-     * exception is rethrown to the caller of blockForFrame()
-     */
+    /** Used to hold the most recent exception that is passed to the {@link #error(Exception)} method. */
     private volatile Exception _error;
 
+    /** Holds the channel id for the channel upon which this listener is waiting for a response. */
     protected int _channelId;
 
+    /** Holds the incoming method. */
     protected AMQMethodEvent _doneEvt = null;
 
+    /**
+     * Creates a new method listener, that filters incoming method to just those that match the specified channel id.
+     *
+     * @param channelId The channel id to filter incoming methods with.
+     */
     public BlockingMethodFrameListener(int channelId)
     {
         _channelId = channelId;
     }
 
     /**
-     * This method is called by the MINA dispatching thread. Note that it could
-     * be called before blockForFrame() has been called.
+     * Delegates any additional handling of the incoming methods to another handler.
      *
-     * @param evt the frame event
-     * @return true if the listener has dealt with this frame
-     * @throws AMQException
+     * @param channelId The channel id of the incoming method.
+     * @param frame     The method body.
+     *
+     * @return <tt>true</tt> if the method was handled, <tt>false</tt> otherwise.
      */
-    public boolean methodReceived(AMQMethodEvent evt) throws AMQException
+    public abstract boolean processMethod(int channelId, AMQMethodBody frame); // throws AMQException;
+
+    /**
+     * Informs this listener that an AMQP method has been received.
+     *
+     * @param evt The AMQP method.
+     *
+     * @return <tt>true</tt> if this listener has handled the method, <tt>false</tt> otherwise.
+     */
+    public boolean methodReceived(AMQMethodEvent evt) // throws AMQException
     {
         AMQMethodBody method = evt.getMethod();
 
-        try
+        /*try
+        {*/
+        boolean ready = (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method);
+
+        if (ready)
         {
-            boolean ready = (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method);
-            if (ready)
+            // we only update the flag from inside the synchronized block
+            // so that the blockForFrame method cannot "miss" an update - it
+            // will only ever read the flag from within the synchronized block
+            synchronized (_lock)
             {
-                // we only update the flag from inside the synchronized block
-                // so that the blockForFrame method cannot "miss" an update - it
-                // will only ever read the flag from within the synchronized block
-                synchronized (_lock)
-                {
-                    _doneEvt = evt;
-                    _ready = ready;
-                    _lock.notify();
-                }
+                _doneEvt = evt;
+                _ready = ready;
+                _lock.notify();
             }
-            return ready;
         }
+
+        return ready;
+
+        /*}
         catch (AMQException e)
         {
             error(e);
             // we rethrow the error here, and the code in the frame dispatcher will go round
             // each listener informing them that an exception has been thrown
             throw e;
-        }
+        }*/
     }
 
     /**
-     * This method is called by the thread that wants to wait for a frame.
+     * Blocks until a method is received that is handled by the delegated to method listener, or the specified timeout
+     * has passed.
+     *
+     * @param timeout The timeout in milliseconds.
+     *
+     * @return The AMQP method that was received.
+     *
+     * @throws AMQException
+     * @throws FailoverException
      */
-    public AMQMethodEvent blockForFrame(long timeout) throws AMQException
+    public AMQMethodEvent blockForFrame(long timeout) throws AMQException, FailoverException
     {
         synchronized (_lock)
         {
@@ -117,24 +183,25 @@
                 catch (InterruptedException e)
                 {
                     // IGNORE    -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess
-//                    if (!_ready && timeout != -1)
-//                    {
-//                        _error = new AMQException("Server did not respond timely");
-//                        _ready = true;
-//                    }
+                    // if (!_ready && timeout != -1)
+                    // {
+                    // _error = new AMQException("Server did not respond timely");
+                    // _ready = true;
+                    // }
                 }
             }
         }
+
         if (_error != null)
         {
             if (_error instanceof AMQException)
             {
-                throw(AMQException) _error;
+                throw (AMQException) _error;
             }
             else if (_error instanceof FailoverException)
             {
-                // This should ensure that FailoverException is not wrapped and can be caught.                
-                throw(FailoverException) _error;  // needed to expose FailoverException.
+                // This should ensure that FailoverException is not wrapped and can be caught.
+                throw (FailoverException) _error; // needed to expose FailoverException.
             }
             else
             {
@@ -156,6 +223,7 @@
         // set the error so that the thread that is blocking (against blockForFrame())
         // can pick up the exception and rethrow to the caller
         _error = e;
+
         synchronized (_lock)
         {
             _ready = true;

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java?view=diff&rev=547730&r1=547729&r2=547730
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java Fri Jun 15 09:28:46 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,11 +20,12 @@
  */
 package org.apache.qpid.client.protocol;
 
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class HeartbeatConfig
 {
-    private static final Logger _logger = Logger.getLogger(HeartbeatConfig.class);
+    private static final Logger _logger = LoggerFactory.getLogger(HeartbeatConfig.class);
     static final HeartbeatConfig CONFIG = new HeartbeatConfig();
 
     /**
@@ -35,13 +36,13 @@
     HeartbeatConfig()
     {
         String property = System.getProperty("amqj.heartbeat.timeoutFactor");
-        if(property != null)
+        if (property != null)
         {
             try
             {
                 timeoutFactor = Float.parseFloat(property);
             }
-            catch(NumberFormatException e)
+            catch (NumberFormatException e)
             {
                 _logger.warn("Invalid timeout factor (amqj.heartbeat.timeoutFactor): " + property);
             }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java?view=diff&rev=547730&r1=547729&r2=547730
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java Fri Jun 15 09:28:46 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,10 +20,12 @@
  */
 package org.apache.qpid.client.protocol;
 
-import org.apache.log4j.Logger;
 import org.apache.mina.common.IoFilterAdapter;
 import org.apache.mina.common.IoSession;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * A MINA filter that monitors the numbers of messages pending to be sent by MINA. It outputs a message
  * when a threshold has been exceeded, and has a frequency configuration so that messages are not output
@@ -32,13 +34,13 @@
  */
 public class ProtocolBufferMonitorFilter extends IoFilterAdapter
 {
-    private static final Logger _logger = Logger.getLogger(ProtocolBufferMonitorFilter.class);
+    private static final Logger _logger = LoggerFactory.getLogger(ProtocolBufferMonitorFilter.class);
 
     public static long DEFAULT_FREQUENCY = 5000;
 
     public static int DEFAULT_THRESHOLD = 3000;
 
-    private int  _bufferedMessages = 0;
+    private int _bufferedMessages = 0;
 
     private int _threshold;
 
@@ -58,7 +60,7 @@
         _outputFrequencyInMillis = frequency;
     }
 
-    public void messageReceived( NextFilter nextFilter, IoSession session, Object message ) throws Exception
+    public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception
     {
         _bufferedMessages++;
         if (_bufferedMessages > _threshold)
@@ -66,8 +68,8 @@
             long now = System.currentTimeMillis();
             if ((now - _lastMessageOutputTime) > _outputFrequencyInMillis)
             {
-                _logger.warn("Protocol message buffer exceeded threshold of " + _threshold + ". Current backlog: " +
-                             _bufferedMessages);
+                _logger.warn("Protocol message buffer exceeded threshold of " + _threshold + ". Current backlog: "
+                    + _bufferedMessages);
                 _lastMessageOutputTime = now;
             }
         }
@@ -75,7 +77,7 @@
         nextFilter.messageReceived(session, message);
     }
 
-    public void messageSent( NextFilter nextFilter, IoSession session, Object message ) throws Exception
+    public void messageSent(NextFilter nextFilter, IoSession session, Object message) throws Exception
     {
         _bufferedMessages--;
         nextFilter.messageSent(session, message);