You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/08/15 05:41:24 UTC

svn commit: r686136 [14/17] - in /incubator/qpid/branches/qpid.0-10/java: ./ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/bin/ broker/etc/ broker...

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java Thu Aug 14 20:40:49 2008
@@ -26,7 +26,6 @@
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 
 /**
@@ -35,7 +34,7 @@
 public class JMSStreamMessage extends AbstractBytesTypedMessage implements StreamMessage
 {
     public static final String MIME_TYPE="jms/stream-message";
-    private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
+
 
 
     /**
@@ -44,38 +43,40 @@
      */
     private int _byteArrayRemaining = -1;
 
-    public JMSStreamMessage()
+    public JMSStreamMessage(AMQMessageDelegateFactory delegateFactory)
     {
-        this(null);
+        this(delegateFactory,null);
+
     }
 
     /**
      * Construct a stream message with existing data.
      *
+     * @param delegateFactory
      * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
-     *             set to auto expand
      */
-    JMSStreamMessage(ByteBuffer data)
+    JMSStreamMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
     {
-        super(data); // this instanties a content header
-    }
 
+        super(delegateFactory, data); // this instanties a content header
+    }
 
-    JMSStreamMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
-                     AMQShortString routingKey, ByteBuffer data) throws AMQException
+    JMSStreamMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
     {
-        super(messageNbr, contentHeader, exchange, routingKey, data);
+
+        super(delegate, data);
     }
 
+
     public void reset()
     {
         super.reset();
         _readableMessage = true;
     }
 
-    public AMQShortString getMimeTypeAsShortString()
+    protected String getMimeType()
     {
-        return MIME_TYPE_SHORT_STRING;
+        return MIME_TYPE;
     }
 
 

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java Thu Aug 14 20:40:49 2008
@@ -25,19 +25,16 @@
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 
 public class JMSStreamMessageFactory extends AbstractJMSMessageFactory
 {
-    protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
-                                               AMQShortString exchange, AMQShortString routingKey,
-                                               BasicContentHeaderProperties contentHeader) throws AMQException
+    protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
     {
-        return new JMSStreamMessage(deliveryTag, contentHeader, exchange, routingKey, data);
+        return new JMSStreamMessage(delegate, data);
     }
-    public AbstractJMSMessage createMessage() throws JMSException
+    public AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
     {
-        return new JMSStreamMessage();
+        return new JMSStreamMessage(delegateFactory);
     }
 }

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java Thu Aug 14 20:40:49 2008
@@ -31,60 +31,49 @@
 import org.apache.qpid.client.CustomJMSXProperty;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.util.Strings;
 
 public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.TextMessage
 {
     private static final String MIME_TYPE = "text/plain";
-    private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
-
 
     private String _decodedValue;
 
     /**
      * This constant represents the name of a property that is set when the message payload is null.
      */
-    private static final AMQShortString PAYLOAD_NULL_PROPERTY = CustomJMSXProperty.JMS_AMQP_NULL.getShortStringName();
+    private static final String PAYLOAD_NULL_PROPERTY = CustomJMSXProperty.JMS_AMQP_NULL.toString();
     private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
 
-    public JMSTextMessage() throws JMSException
+    public JMSTextMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
     {
-        this(null, null);
+        this(delegateFactory, null, null);
     }
 
-    JMSTextMessage(ByteBuffer data, String encoding) throws JMSException
+    JMSTextMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data, String encoding) throws JMSException
     {
-        super(data); // this instantiates a content header
-        getContentHeaderProperties().setContentType(MIME_TYPE_SHORT_STRING);
-        getContentHeaderProperties().setEncoding(encoding);
+        super(delegateFactory, data); // this instantiates a content header
+        setContentType(getMimeType());
+        setEncoding(encoding);
     }
 
-    JMSTextMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
-                   AMQShortString routingKey, ByteBuffer data)
+    JMSTextMessage(AMQMessageDelegate delegate, ByteBuffer data)
             throws AMQException
     {
-        super(deliveryTag, contentHeader, exchange, routingKey, data);
-        contentHeader.setContentType(MIME_TYPE_SHORT_STRING);
+        super(delegate, data);
+        setContentType(getMimeType());
         _data = data;
     }
 
-    JMSTextMessage(ByteBuffer data) throws JMSException
-    {
-        this(data, null);
-    }
-
-    JMSTextMessage(String text) throws JMSException
-    {
-        super((ByteBuffer) null);
-        setText(text);
-    }
 
     public void clearBodyImpl() throws JMSException
     {
         if (_data != null)
         {
             _data.release();
+            _data = null;
         }
-        _data = null;
+
         _decodedValue = null;
     }
 
@@ -93,14 +82,9 @@
         return getText();
     }
 
-    public void setData(ByteBuffer data)
+    protected String getMimeType()
     {
-        _data = data;
-    }
-
-    public AMQShortString getMimeTypeAsShortString()
-    {
-        return MIME_TYPE_SHORT_STRING;
+        return MIME_TYPE;
     }
 
     public void setText(String text) throws JMSException
@@ -111,20 +95,17 @@
         try
         {
             if (text != null)
-            {                
-                _data = ByteBuffer.allocate(text.length());
-                _data.limit(text.length()) ;
-                //_data.sweep();
-                _data.setAutoExpand(true);
-                final String encoding = getContentHeaderProperties().getEncodingAsString();
-                if (encoding == null)
+            {
+                final String encoding = getEncoding();
+                if (encoding == null || encoding.equalsIgnoreCase("UTF-8"))
                 {
-                    _data.put(text.getBytes(DEFAULT_CHARSET.name()));
+                    _data = ByteBuffer.wrap(Strings.toUTF8(text));
                 }
                 else
                 {
-                    _data.put(text.getBytes(encoding));
+                    _data = ByteBuffer.wrap(text.getBytes(encoding));
                 }
+                _data.position(_data.limit());
                 _changedData=true;
             }
             _decodedValue = text;
@@ -156,11 +137,11 @@
             {
                 return null;
             }
-            if (getContentHeaderProperties().getEncodingAsString() != null)
+            if (getEncoding() != null)
             {
                 try
                 {
-                    _decodedValue = _data.getString(Charset.forName(getContentHeaderProperties().getEncodingAsString()).newDecoder());
+                    _decodedValue = _data.getString(Charset.forName(getEncoding()).newDecoder());
                 }
                 catch (CharacterCodingException e)
                 {
@@ -199,4 +180,6 @@
             removeProperty(PAYLOAD_NULL_PROPERTY);
         }
     }
+
+
 }

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java Thu Aug 14 20:40:49 2008
@@ -26,21 +26,17 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
 
 public class JMSTextMessageFactory extends AbstractJMSMessageFactory
 {
 
-    public AbstractJMSMessage createMessage() throws JMSException
+    public AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
     {
-        return new JMSTextMessage();
+        return new JMSTextMessage(delegateFactory);
     }
 
-    protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
-                                               AMQShortString exchange, AMQShortString routingKey, 
-                                               BasicContentHeaderProperties contentHeader) throws AMQException
+    protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
     {
-        return new JMSTextMessage(deliveryTag,  contentHeader, 
-                                  exchange, routingKey, data);
+        return new JMSTextMessage(delegate, data);
     }
 }

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java Thu Aug 14 20:40:49 2008
@@ -22,15 +22,9 @@
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.qpid.client.AMQSession;
 
-import javax.jms.BytesMessage;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageEOFException;
-import javax.jms.ObjectMessage;
-import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
+import javax.jms.*;
 
 import java.util.Enumeration;
 
@@ -52,12 +46,11 @@
         _newMessage = message;
     }
 
-    public MessageConverter(BytesMessage message) throws JMSException
+    public MessageConverter(AMQSession session, BytesMessage bytesMessage) throws JMSException
     {
-        BytesMessage bytesMessage = (BytesMessage) message;
         bytesMessage.reset();
 
-        JMSBytesMessage nativeMsg = new JMSBytesMessage();
+        JMSBytesMessage nativeMsg = (JMSBytesMessage) session.createBytesMessage();
 
         byte[] buf = new byte[1024];
 
@@ -69,12 +62,12 @@
         }
 
         _newMessage = nativeMsg;
-        setMessageProperties(message);
+        setMessageProperties(bytesMessage);
     }
 
-    public MessageConverter(MapMessage message) throws JMSException
+    public MessageConverter(AMQSession session, MapMessage message) throws JMSException
     {
-        MapMessage nativeMessage = new JMSMapMessage();
+        MapMessage nativeMessage = session.createMapMessage();
 
         Enumeration mapNames = message.getMapNames();
         while (mapNames.hasMoreElements())
@@ -87,21 +80,21 @@
         setMessageProperties(message);
     }
 
-    public MessageConverter(ObjectMessage message) throws JMSException
+    public MessageConverter(AMQSession session, ObjectMessage origMessage) throws JMSException
     {
-        ObjectMessage origMessage = (ObjectMessage) message;
-        ObjectMessage nativeMessage = new JMSObjectMessage();
+
+        ObjectMessage nativeMessage = session.createObjectMessage();
 
         nativeMessage.setObject(origMessage.getObject());
 
         _newMessage = (AbstractJMSMessage) nativeMessage;
-        setMessageProperties(message);
+        setMessageProperties(origMessage);
 
     }
 
-    public MessageConverter(TextMessage message) throws JMSException
+    public MessageConverter(AMQSession session, TextMessage message) throws JMSException
     {
-        TextMessage nativeMessage = new JMSTextMessage();
+        TextMessage nativeMessage = session.createTextMessage();
 
         nativeMessage.setText(message.getText());
 
@@ -109,9 +102,9 @@
         setMessageProperties(message);
     }
 
-    public MessageConverter(StreamMessage message) throws JMSException
+    public MessageConverter(AMQSession session, StreamMessage message) throws JMSException
     {
-        StreamMessage nativeMessage = new JMSStreamMessage();
+        StreamMessage nativeMessage = session.createStreamMessage();
 
         try
         {
@@ -130,11 +123,11 @@
         setMessageProperties(message);
     }
 
-    public MessageConverter(Message message) throws JMSException
+    public MessageConverter(AMQSession session, Message message) throws JMSException
     {
         // Send a message with just properties.
         // Throwing away content
-        BytesMessage nativeMessage = new JMSBytesMessage();
+        Message nativeMessage = session.createMessage();
 
         _newMessage = (AbstractJMSMessage) nativeMessage;
         setMessageProperties(message);

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java Thu Aug 14 20:40:49 2008
@@ -27,7 +27,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpidity.transport.Struct;
+import org.apache.qpid.transport.Struct;
 
 
 public interface MessageFactory
@@ -39,10 +39,9 @@
         throws JMSException, AMQException;
 
      AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
-                                     Struct[] contentHeader,
-                                     AMQShortString exchange, AMQShortString routingKey,
-                                     List bodies, String replyToURL)
+                                      Struct[] contentHeader,
+                                      java.nio.ByteBuffer body)
         throws JMSException, AMQException;
 
-    AbstractJMSMessage createMessage() throws JMSException;
+    AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException;
 }

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Thu Aug 14 20:40:49 2008
@@ -23,6 +23,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.nio.ByteBuffer;
 
 import javax.jms.JMSException;
 
@@ -30,9 +31,10 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpidity.transport.Struct;
-import org.apache.qpidity.transport.DeliveryProperties;
-import org.apache.qpidity.transport.MessageProperties;
+import org.apache.qpid.transport.Struct;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.DeliveryProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -92,8 +94,7 @@
      * @param deliveryTag   the AMQ message id
      * @param redelivered   true if redelivered
      * @param contentHeader the content header that was received
-     * @param bodies        a list of ContentBody instances
-     * @return the message.
+     * @param bodies        a list of ContentBody instances @return the message.
      * @throws AMQException
      * @throws JMSException
      */
@@ -120,30 +121,35 @@
         }
     }
 
-    public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange,
-                                            AMQShortString routingKey, Struct[] contentHeader, List bodies,
-                                            String replyTo) throws AMQException, JMSException
+    public AbstractJMSMessage createMessage(MessageTransfer transfer) throws AMQException, JMSException
     {
-        MessageProperties mprop = (MessageProperties) contentHeader[0];
+
+        MessageProperties mprop = transfer.getHeader().get(MessageProperties.class);
         String messageType = mprop.getContentType();
         if (messageType == null)
         {
             _logger.debug("no message type specified, building a byte message");
             messageType = JMSBytesMessage.MIME_TYPE;
         }
-        MessageFactory mf = _mimeShortStringToFactoryMap.get(new AMQShortString(messageType));
+        MessageFactory mf = _mimeStringToFactoryMap.get(messageType);
         if (mf == null)
         {
             throw new AMQException(null, "Unsupport MIME type of " + messageType, null);
         }
         else
         {
-            return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies, replyTo);
+            boolean redelivered = false;
+            DeliveryProperties deliverProps;
+            if((deliverProps = transfer.getHeader().get(DeliveryProperties.class)) != null)
+            {
+                redelivered = deliverProps.getRedelivered();
+            }
+            return mf.createMessage(transfer.getId(), redelivered, transfer.getHeader().getStructs(), transfer.getBody());
         }
     }
 
 
-    public AbstractJMSMessage createMessage(String mimeType) throws AMQException, JMSException
+    public AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory, String mimeType) throws AMQException, JMSException
     {
         if (mimeType == null)
         {
@@ -157,7 +163,7 @@
         }
         else
         {
-            return mf.createMessage();
+            return mf.createMessage(delegateFactory);
         }
     }
 }

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java Thu Aug 14 20:40:49 2008
@@ -7,9 +7,9 @@
     final private AMQShortString  _replyText;
     final private int _replyCode;
 
-    public ReturnMessage(int channelId,AMQShortString exchange,AMQShortString routingKey,AMQShortString replyText,int replyCode)
+    public ReturnMessage(AMQShortString exchange, AMQShortString routingKey, AMQShortString replyText, int replyCode)
     {
-        super(channelId,-1,null,exchange,routingKey,false);
+        super(-1,0,exchange,routingKey,false);
         _replyText = replyText;
         _replyCode = replyCode;
     }

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java Thu Aug 14 20:40:49 2008
@@ -20,23 +20,7 @@
  */
 package org.apache.qpid.client.message;
 
-import java.util.List;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQChannelException;
-import org.apache.qpid.AMQConnectionException;
-import org.apache.qpid.AMQException;
 import org.apache.qpid.client.BasicMessageConsumer;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicDeliverBody;
-import org.apache.qpid.framing.BasicReturnBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.MethodDispatcher;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
 
 /**
@@ -46,65 +30,24 @@
  * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
  * thread in order to minimise the amount of work done in the MINA dispatcher thread.
  */
-public abstract class UnprocessedMessage<H,B>
+public abstract class UnprocessedMessage
 {
-    private final int _channelId;
-    private final long _deliveryId;
-    private final AMQShortString _consumerTag;
-    protected AMQShortString _exchange;
-    protected AMQShortString _routingKey;
-    protected boolean _redelivered;
+    private final int _consumerTag;
+
 
-    public UnprocessedMessage(int channelId,long deliveryId,AMQShortString consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
+    public UnprocessedMessage(int consumerTag)
     {
-        _channelId = channelId;
-        _deliveryId = deliveryId;
         _consumerTag = consumerTag;
-        _exchange = exchange;
-        _routingKey = routingKey;
-        _redelivered = redelivered;
     }
 
-    public abstract void receiveBody(B nativeMessageBody);
 
-    public abstract void setContentHeader(H nativeMessageHeader);
-
-    public int getChannelId()
-    {
-        return _channelId;
-    }
+    abstract public long getDeliveryTag();
 
-    public long getDeliveryTag()
-    {
-        return _deliveryId;
-    }
 
-    public AMQShortString getConsumerTag()
+    public int getConsumerTag()
     {
         return _consumerTag;
     }
 
-    public AMQShortString getExchange()
-    {
-        return _exchange;
-    }
-
-    public AMQShortString getRoutingKey()
-    {
-        return _routingKey;
-    }
 
-    public boolean isRedelivered()
-    {
-        return _redelivered;
-    }
-    public abstract List<B> getBodies();
-  
-    public abstract H getContentHeader();
- 
-    // specific to 0_10
-    public String getReplyToURL()
-    {
-        return "";
-    }    
-}
+}
\ No newline at end of file

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java Thu Aug 14 20:40:49 2008
@@ -20,13 +20,7 @@
  */
 package org.apache.qpid.client.message;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpidity.transport.DeliveryProperties;
-import org.apache.qpidity.transport.Struct;
+import org.apache.qpid.transport.MessageTransfer;
 
 /**
  * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
@@ -35,58 +29,25 @@
  * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
  * thread in order to minimise the amount of work done in the MINA dispatcher thread.
  */
-public class UnprocessedMessage_0_10 extends UnprocessedMessage<Struct[],ByteBuffer>
+public class UnprocessedMessage_0_10 extends UnprocessedMessage
 {
-    private Struct[] _headers;
-    private String _replyToURL;
+    private MessageTransfer _transfer;
 
-    /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
-    private List<ByteBuffer> _bodies = new ArrayList<ByteBuffer>();
-
-    public UnprocessedMessage_0_10(int channelId,long deliveryId,AMQShortString consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
-    {
-        super(channelId,deliveryId,consumerTag,exchange,routingKey,redelivered);
-    }
-
-    public void receiveBody(ByteBuffer body)
-    {
-
-        _bodies.add(body);
-    }
-
-    public void setContentHeader(Struct[] headers)
+    public UnprocessedMessage_0_10(int consumerTag, MessageTransfer xfr)
     {
-        this._headers = headers;
-        for(Struct s: headers)
-        {
-            if (s instanceof DeliveryProperties)
-            {
-                DeliveryProperties props = (DeliveryProperties)s;
-                _exchange = new AMQShortString(props.getExchange());
-                _routingKey = new AMQShortString(props.getRoutingKey());
-                _redelivered = props.getRedelivered();
-            }
-        }
-    }
-
-    public Struct[] getContentHeader()
-    {
-        return _headers;
-    }
-
-    public List<ByteBuffer> getBodies()
-    {
-        return _bodies;
+        super(consumerTag);
+        _transfer = xfr;
     }
 
     // additional 0_10 method
-    public String getReplyToURL()
+
+    public long getDeliveryTag()
     {
-        return _replyToURL;
+        return _transfer.getId();
     }
 
-    public void setReplyToURL(String url)
+    public MessageTransfer getMessageTransfer()
     {
-        _replyToURL = url;
+        return _transfer;
     }
 }

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java Thu Aug 14 20:40:49 2008
@@ -26,7 +26,6 @@
 
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicDeliverBody;
-import org.apache.qpid.framing.BasicReturnBody;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
 
@@ -37,32 +36,54 @@
  * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
  * thread in order to minimise the amount of work done in the MINA dispatcher thread.
  */
-public class UnprocessedMessage_0_8 extends UnprocessedMessage<ContentHeaderBody,ContentBody>
+public class UnprocessedMessage_0_8 extends UnprocessedMessage
 {
     private long _bytesReceived = 0;
 
+
+    private AMQShortString _exchange;
+    private AMQShortString _routingKey;
+    private final long _deliveryId;
+    protected boolean _redelivered;
+
     private BasicDeliverBody _deliverBody;
     private ContentHeaderBody _contentHeader;
 
     /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
     private List<ContentBody> _bodies;
 
-    public UnprocessedMessage_0_8(int channelId,long deliveryId,AMQShortString consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
+    public UnprocessedMessage_0_8(long deliveryId, int consumerTag, AMQShortString exchange, AMQShortString routingKey, boolean redelivered)
+    {
+        super(consumerTag);
+        _exchange = exchange;
+        _routingKey = routingKey;
+
+        _redelivered = redelivered;
+        _deliveryId = deliveryId;
+    }
+
+
+    public AMQShortString getExchange()
+    {
+        return _exchange;
+    }
+
+    public AMQShortString getRoutingKey()
     {
-        super(channelId,deliveryId,consumerTag,exchange,routingKey,redelivered);
+        return _routingKey;
     }
 
-    public UnprocessedMessage_0_8(int channelId, BasicReturnBody body)
+    public long getDeliveryTag()
     {
-        //FIXME: TGM, SRSLY 4RL
-        super(channelId, 0, null, body.getExchange(), body.getRoutingKey(), false);
+        return _deliveryId;
     }
 
-    public UnprocessedMessage_0_8(int channelId, BasicDeliverBody body)
+    public boolean isRedelivered()
     {
-        super(channelId, body.getDeliveryTag(), body.getConsumerTag(), body.getExchange(), body.getRoutingKey(), false);
+        return _redelivered;
     }
 
+
     public void receiveBody(ContentBody body)
     {
 
@@ -124,7 +145,7 @@
     public String toString()
     {
         StringBuilder buf = new StringBuilder();
-        buf.append("Channel Id : " + this.getChannelId());
+
         if (_contentHeader != null)
         {
           buf.append("ContentHeader " + _contentHeader);

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Thu Aug 14 20:40:49 2008
@@ -43,14 +43,17 @@
 import org.apache.qpid.client.failover.FailoverState;
 import org.apache.qpid.client.state.AMQState;
 import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateWaiter;
 import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
 import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.framing.*;
+import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.pool.ReadWriteThreadModel;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.network.io.IoTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -100,28 +103,29 @@
  * <p/><table id="crc"><caption>CRC Card</caption>
  * <tr><th> Responsibilities <th> Collaborations
  * <tr><td> Create the filter chain to filter this handlers events.
- *     <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}.
+ * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}.
  *
  * <tr><td> Maintain fail-over state.
  * <tr><td>
  * </table>
  *
  * @todo Explain the system property: amqj.shared_read_write_pool. How does putting the protocol codec filter before the
- *       async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing
- *       anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec
- *       filter before it mean not doing the read/write asynchronously but in the main filter thread?
- *
+ * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing
+ * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec
+ * filter before it mean not doing the read/write asynchronously but in the main filter thread?
  * @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including
- *       failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of
- *       AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could
- *       be merged, although there is sense in keeping the session model seperate. Will clarify things by having data
- *       held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
- *       that lifecycles of the fields match lifecycles of their containing objects.
+ * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of
+ * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could
+ * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data
+ * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
+ * that lifecycles of the fields match lifecycles of their containing objects.
  */
 public class AMQProtocolHandler extends IoHandlerAdapter
 {
     /** Used for debugging. */
     private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class);
+    private static final Logger _protocolLogger = LoggerFactory.getLogger("qpid.protocol");
+    private static final boolean PROTOCOL_DEBUG = (System.getProperty("amqj.protocol.logging.level") != null);
 
     /**
      * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection
@@ -136,7 +140,7 @@
     private AMQStateManager _stateManager = new AMQStateManager();
 
     /** Holds the method listeners, */
-    private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet();
+    private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
 
     /**
      * We create the failover handler when the session is created since it needs a reference to the IoSession in order
@@ -154,14 +158,12 @@
     /** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */
     private CountDownLatch _failoverLatch;
 
-
     /** The last failover exception that occured */
     private FailoverException _lastFailoverException;
 
     /** Defines the default timeout to use for synchronous protocol commands. */
     private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
 
-
     /**
      * Creates a new protocol handler, associated with the specified client connection instance.
      *
@@ -245,11 +247,27 @@
                 _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
             }
         }
-        _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
+        _protocolSession = new AMQProtocolSession(this, session, _connection);
+
+        _stateManager.setProtocolSession(_protocolSession);
+
         _protocolSession.init();
     }
 
     /**
+     * Called when we want to create a new IoTransport session
+     * @param brokerDetail 
+     */
+    public void createIoTransportSession(BrokerDetails brokerDetail)
+    {
+        _protocolSession = new AMQProtocolSession(this, _connection);
+        _stateManager.setProtocolSession(_protocolSession);
+        IoTransport.connect_0_9(getProtocolSession(),
+                brokerDetail.getHost(), brokerDetail.getPort());
+        _protocolSession.init();
+    }
+    
+    /**
      * Called when the network connection is closed. This can happen, either because the client explicitly requested
      * that the connection be closed, in which case nothing is done, or because the connection died. In the case
      * where the connection died, an attempt to failover automatically to a new connection may be started. The failover
@@ -263,7 +281,7 @@
      * @param session The MINA session.
      *
      * @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and
-     *       not otherwise? The above comment doesn't make that clear.
+     * not otherwise? The above comment doesn't make that clear.
      */
     public void sessionClosed(IoSession session)
     {
@@ -374,7 +392,7 @@
                                  "cause isn't AMQConnectionClosedException: " + cause, cause);
 
                     AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
-                    propagateExceptionToWaiters(amqe);
+                    propagateExceptionToAllWaiters(amqe);
                 }
                 _connection.exceptionReceived(cause);
 
@@ -395,7 +413,7 @@
             // we notify the state manager of the error in case we have any clients waiting on a state
             // change. Those "waiters" will be interrupted and can handle the exception
             AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
-            propagateExceptionToWaiters(amqe);
+            propagateExceptionToAllWaiters(amqe);
             _connection.exceptionReceived(cause);
         }
     }
@@ -405,11 +423,33 @@
      * These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type
      * of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately.
      *
+     * This should be called only when the exception is fatal for the connection.
+     *
      * @param e the exception to propagate
+     *
+     * @see #propagateExceptionToFrameListeners
+     * @see #propagateExceptionToStateWaiters
      */
-    public void propagateExceptionToWaiters(Exception e)
+    public void propagateExceptionToAllWaiters(Exception e)
+    {
+        propagateExceptionToFrameListeners(e);
+        propagateExceptionToStateWaiters(e);
+    }
+
+    /**
+     * This caters for the case where we only need to propogate an exception to the the frame listeners to interupt any
+     * protocol level waits.
+     *
+     * This will would normally be used to notify all Frame Listeners that Failover is about to occur and they should
+     * stop waiting and relinquish the Failover lock {@see FailoverHandler}.
+     *
+     * Once the {@link FailoverHandler} has re-established the connection then the listeners will be able to re-attempt
+     * their protocol request and so listen again for the correct frame.
+     *
+     * @param e the exception to propagate
+     */
+    public void propagateExceptionToFrameListeners(Exception e)
     {
-        
         if (!_frameListeners.isEmpty())
         {
             final Iterator it = _frameListeners.iterator();
@@ -421,6 +461,22 @@
         }
     }
 
+    /**
+     * This caters for the case where we only need to propogate an exception to the the state manager to interupt any
+     * thing waiting for a state change.
+     *
+     * Currently (2008-07-15) the state manager is only used during 0-8/0-9 Connection establishement.
+     *
+     * Normally the state manager would not need to be notified without notifiying the frame listeners so in normal
+     * cases {@link #propagateExceptionToAllWaiters} would be the correct choice.
+     *
+     * @param e the exception to propagate
+     */
+    public void propagateExceptionToStateWaiters(Exception e)
+    {
+        getStateManager().error(e);
+    }
+
     public void notifyFailoverStarting()
     {
         // Set the last exception in the sync block to ensure the ordering with add.
@@ -431,7 +487,9 @@
             _lastFailoverException = new FailoverException("Failing over about to start");
         }
 
-        propagateExceptionToWaiters(_lastFailoverException);
+        //Only notify the Frame listeners that failover is going to occur as the State listeners shouldn't be
+        // interupted unless failover cannot restore the state.
+        propagateExceptionToFrameListeners(_lastFailoverException);
     }
 
     public void failoverInProgress()
@@ -443,6 +501,11 @@
 
     public void messageReceived(IoSession session, Object message) throws Exception
     {
+        if (PROTOCOL_DEBUG)
+        {
+            _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
+        }
+
         if(message instanceof AMQFrame)
         {
             final boolean debug = _logger.isDebugEnabled();
@@ -459,7 +522,7 @@
 
             HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
 
-            bodyFrame.handle(frame.getChannel(),_protocolSession);
+            bodyFrame.handle(frame.getChannel(), _protocolSession);
 
             _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
         }
@@ -508,20 +571,12 @@
             if (!wasAnyoneInterested)
             {
                 throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener.  Listeners:"
-                                       + _frameListeners, null);
+                                             + _frameListeners, null);
             }
         }
         catch (AMQException e)
-        {            
-            if (!_frameListeners.isEmpty())
-            {
-                Iterator it = _frameListeners.iterator();
-                while (it.hasNext())
-                {
-                    final AMQMethodListener listener = (AMQMethodListener) it.next();
-                    listener.error(e);
-                }
-            }
+        {
+            propagateExceptionToFrameListeners(e);
 
             exceptionCaught(session, e);
         }
@@ -532,6 +587,11 @@
 
     public void messageSent(IoSession session, Object message) throws Exception
     {
+        if (PROTOCOL_DEBUG)
+        {
+            _protocolLogger.debug(String.format("SEND: [%s] %s", this, message));
+        }
+        
         final long sentMessages = _messagesOut++;
 
         final boolean debug = _logger.isDebugEnabled();
@@ -542,34 +602,13 @@
         }
 
         _connection.bytesSent(session.getWrittenBytes());
-        if (debug)
-        {
-            _logger.debug("Sent frame " + message);
-        }
-    }
-
-    /*
-      public void addFrameListener(AMQMethodListener listener)
-      {
-          _frameListeners.add(listener);
-      }
-
-      public void removeFrameListener(AMQMethodListener listener)
-      {
-          _frameListeners.remove(listener);
-      }
-     */
-    public void attainState(AMQState s) throws AMQException
-    {
-        getStateManager().attainState(s);
     }
 
-    public AMQState attainState(Set<AMQState> states) throws AMQException
+    public StateWaiter createWaiter(Set<AMQState> states) throws AMQException
     {
-        return getStateManager().attainState(states);
+        return getStateManager().createWaiter(states);
     }
 
-
     /**
      * Convenience method that writes a frame to the protocol session. Equivalent to calling
      * getProtocolSession().write().
@@ -617,14 +656,12 @@
                 {
                     throw _lastFailoverException;
                 }
-                
+
                 _frameListeners.add(listener);
             }
             _protocolSession.writeFrame(frame);
 
-            AMQMethodEvent e = listener.blockForFrame(timeout);
-
-            return e;
+            return listener.blockForFrame(timeout);
             // When control resumes before this line, a reply will have been received
             // that matches the criteria defined in the blocking listener
         }
@@ -669,8 +706,7 @@
         getStateManager().changeState(AMQState.CONNECTION_CLOSING);
 
         ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
-                new AMQShortString("JMS client is closing the connection."),0,0);
-
+                                                                                                  new AMQShortString("JMS client is closing the connection."), 0, 0);
 
         final AMQFrame frame = body.generateFrame(0);
 
@@ -745,10 +781,6 @@
     public void setStateManager(AMQStateManager stateManager)
     {
         _stateManager = stateManager;
-        if (_protocolSession != null)
-        {
-            _protocolSession.setStateManager(stateManager);
-        }
     }
 
     public AMQProtocolSession getProtocolSession()
@@ -778,7 +810,7 @@
 
     public MethodRegistry getMethodRegistry()
     {
-        return getStateManager().getMethodRegistry();
+        return _protocolSession.getMethodRegistry();
     }
 
     public ProtocolVersion getProtocolVersion()

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Thu Aug 14 20:40:49 2008
@@ -30,7 +30,6 @@
 
 import javax.jms.JMSException;
 import javax.security.sasl.SaslClient;
-import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -38,13 +37,14 @@
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.ConnectionTuneParameters;
-import org.apache.qpid.client.message.ReturnMessage;
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.message.UnprocessedMessage_0_8;
 import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.AMQState;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.Sender;
 import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
 
 /**
@@ -67,8 +67,6 @@
 
     protected final IoSession _minaProtocolSession;
 
-    private AMQStateManager _stateManager;
-
     protected WriteFuture _lastWriteFuture;
 
     /**
@@ -86,7 +84,7 @@
      * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives
      * first) with the subsequent content header and content bodies.
      */
-    private final ConcurrentMap<Integer,UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer,UnprocessedMessage>();
+    private final ConcurrentMap<Integer, UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer, UnprocessedMessage>();
     private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16];
 
     /** Counter to ensure unique queue names */
@@ -97,26 +95,17 @@
 //    private VersionSpecificRegistry _registry =
 //        MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion());
 
-
     private MethodRegistry _methodRegistry =
             MethodRegistry.getMethodRegistry(ProtocolVersion.getLatestSupportedVersion());
 
-
     private MethodDispatcher _methodDispatcher;
 
+    protected final AMQConnection _connection;
 
-    private final AMQConnection _connection;
     private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
 
     public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
     {
-        this(protocolHandler, protocolSession, connection, new AMQStateManager());
-
-    }
-
-    public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection,
-        AMQStateManager stateManager)
-    {
         _protocolHandler = protocolHandler;
         _minaProtocolSession = protocolSession;
         _minaProtocolSession.setAttachment(this);
@@ -124,20 +113,27 @@
         _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
         // fixme - real value needed
         _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
-        _stateManager = stateManager;
-        _stateManager.setProtocolSession(this);
         _protocolVersion = connection.getProtocolVersion();
         _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
-                                                                 stateManager);
+                                                                           this);
         _connection = connection;
 
     }
 
+    public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
+    {
+        _protocolHandler = protocolHandler;
+        _minaProtocolSession = null;
+        _protocolVersion = connection.getProtocolVersion();
+        _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
+                                                                           this);
+        _connection = connection;
+    }
+
     public void init()
     {
         // start the process of setting up the connection. This is the first place that
         // data is written to the server.
-
         _minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion()));
     }
 
@@ -161,14 +157,7 @@
 
     public AMQStateManager getStateManager()
     {
-        return _stateManager;
-    }
-
-    public void setStateManager(AMQStateManager stateManager)
-    {
-        _stateManager = stateManager;
-        _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(_protocolVersion,
-                                                                 stateManager);         
+        return _protocolHandler.getStateManager();
     }
 
     public String getVirtualHost()
@@ -193,7 +182,7 @@
 
     public SaslClient getSaslClient()
     {
-        return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT);
+        return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT);    
     }
 
     /**
@@ -235,12 +224,11 @@
      *
      * @throws AMQException if this was not expected
      */
-    public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
-    {
-        final int channelId = message.getChannelId();
-        if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+    public void unprocessedMessageReceived(final int channelId, UnprocessedMessage message) throws AMQException
+    {        
+        if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
         {
-            _channelId2UnprocessedMsgArray[channelId] = message;    
+            _channelId2UnprocessedMsgArray[channelId] = message;
         }
         else
         {
@@ -250,18 +238,17 @@
 
     public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException
     {
-        final UnprocessedMessage msg = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId]
-                                                               : _channelId2UnprocessedMsgMap.get(channelId);
-
+        final UnprocessedMessage_0_8 msg = (UnprocessedMessage_0_8) ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId]
+                                               : _channelId2UnprocessedMsgMap.get(channelId));
 
         if (msg == null)
         {
-            throw new AMQException(null, "Error: received content header without having received a BasicDeliver frame first", null);
+            throw new AMQException(null, "Error: received content header without having received a BasicDeliver frame first on session:" + this, null);
         }
 
         if (msg.getContentHeader() != null)
         {
-            throw new AMQException(null, "Error: received duplicate content header or did not receive correct number of content body frames", null);
+            throw new AMQException(null, "Error: received duplicate content header or did not receive correct number of content body frames on session:" + this, null);
         }
 
         msg.setContentHeader(contentHeader);
@@ -275,7 +262,7 @@
     {
         UnprocessedMessage_0_8 msg;
         final boolean fastAccess = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0;
-        if(fastAccess)
+        if (fastAccess)
         {
             msg = (UnprocessedMessage_0_8) _channelId2UnprocessedMsgArray[channelId];
         }
@@ -291,7 +278,7 @@
 
         if (msg.getContentHeader() == null)
         {
-            if(fastAccess)
+            if (fastAccess)
             {
                 _channelId2UnprocessedMsgArray[channelId] = null;
             }
@@ -302,15 +289,7 @@
             throw new AMQException(null, "Error: received content body without having received a ContentHeader frame first", null);
         }
 
-        /*try
-        {*/
         msg.receiveBody(contentBody);
-        /*}
-        catch (UnexpectedBodyReceivedException e)
-        {
-            _channelId2UnprocessedMsgMap.remove(channelId);
-            throw e;
-        }*/
 
         if (msg.isAllBodyDataReceived())
         {
@@ -333,7 +312,7 @@
     {
         AMQSession session = getSession(channelId);
         session.messageReceived(msg);
-        if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+        if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
         {
             _channelId2UnprocessedMsgArray[channelId] = null;
         }
@@ -431,12 +410,12 @@
         return (AMQConnection) _minaProtocolSession.getAttribute(AMQ_CONNECTION);
     }
 
-    public void closeProtocolSession()
+    public void closeProtocolSession() throws AMQException
     {
         closeProtocolSession(true);
     }
 
-    public void closeProtocolSession(boolean waitLast)
+    public void closeProtocolSession(boolean waitLast) throws AMQException
     {
         _logger.debug("Waiting for last write to join.");
         if (waitLast && (_lastWriteFuture != null))
@@ -445,7 +424,15 @@
         }
 
         _logger.debug("Closing protocol session");
+        
         final CloseFuture future = _minaProtocolSession.close();
+
+        // There is no recovery we can do if the join on the close failes so simply mark the connection CLOSED
+        // then wait for the connection to close.
+        // ritchiem: Could this release BlockingWaiters to early? The close has been done as much as possible so any
+        // error now shouldn't matter.
+
+        _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
         future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
     }
 
@@ -482,16 +469,16 @@
     {
         final AMQSession session = getSession(channelId);
 
-        session.confirmConsumerCancelled(consumerTag);
+        session.confirmConsumerCancelled(consumerTag.toIntValue());
     }
 
     public void setProtocolVersion(final ProtocolVersion pv)
     {
         _protocolVersion = pv;
         _methodRegistry = MethodRegistry.getMethodRegistry(pv);
-        _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, _stateManager);
+        _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this);
 
-      //  _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);
+        //  _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);
     }
 
     public byte getProtocolMinorVersion()
@@ -524,12 +511,12 @@
         return _methodDispatcher;
     }
 
-
     public void setTicket(int ticket, int channelId)
     {
         final AMQSession session = getSession(channelId);
         session.setTicket(ticket);
     }
+
     public void setMethodDispatcher(MethodDispatcher methodDispatcher)
     {
         _methodDispatcher = methodDispatcher;
@@ -545,4 +532,14 @@
     {
         _protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession);
     }
+
+    public void notifyError(Exception error)
+    {
+        _protocolHandler.propagateExceptionToAllWaiters(error);
+    }
+
+    public void setSender(Sender<java.nio.ByteBuffer> sender)
+    {
+        // No-op, interface munging
+    }
 }

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java Thu Aug 14 20:40:49 2008
@@ -20,9 +20,14 @@
  */
 package org.apache.qpid.client.protocol;
 
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQTimeoutException;
 import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.util.BlockingWaiter;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
@@ -54,38 +59,17 @@
  * </table>
  *
  * @todo Might be neater if this method listener simply wrapped another that provided the method handling using a
- *       methodRecevied method. The processMethod takes an additional channelId, however none of the implementations
- *       seem to use it. So wrapping the listeners is possible.
- *
- * @todo What is to stop a blocking method listener, receiving a second method whilst it is registered as a listener,
- *       overwriting the first one before the caller of the block method has had a chance to examine it? If one-shot
- *       behaviour is to be intended it should be enforced, perhaps by always returning false once the blocked for
- *       method has been received.
- *
- * @todo Interuption is caught but not handled. This could be allowed to fall through. This might actually be usefull
- *       for fail-over where a thread is blocking when failure happens, it could be interrupted to abandon or retry
- *       when this happens. At the very least, restore the interrupted status flag.
- *
+ * methodRecevied method. The processMethod takes an additional channelId, however none of the implementations
+ * seem to use it. So wrapping the listeners is possible.
  * @todo If the retrotranslator can handle it, could use a SynchronousQueue to implement this rendezvous. Need to
- *       check that SynchronousQueue has a non-blocking put method available.
+ * check that SynchronousQueue has a non-blocking put method available.
  */
-public abstract class BlockingMethodFrameListener implements AMQMethodListener
+public abstract class BlockingMethodFrameListener extends BlockingWaiter<AMQMethodEvent> implements AMQMethodListener
 {
-    /** This flag is used to indicate that the blocked for method has been received. */
-    private volatile boolean _ready = false;
-
-    /** Used to protect the shared event and ready flag between the producer and consumer. */
-    private final Object _lock = new Object();
-
-    /** Used to hold the most recent exception that is passed to the {@link #error(Exception)} method. */
-    private volatile Exception _error;
 
     /** Holds the channel id for the channel upon which this listener is waiting for a response. */
     protected int _channelId;
 
-    /** Holds the incoming method. */
-    protected AMQMethodEvent _doneEvt = null;
-
     /**
      * Creates a new method listener, that filters incoming method to just those that match the specified channel id.
      *
@@ -104,7 +88,14 @@
      *
      * @return <tt>true</tt> if the method was handled, <tt>false</tt> otherwise.
      */
-    public abstract boolean processMethod(int channelId, AMQMethodBody frame); // throws AMQException;
+    public abstract boolean processMethod(int channelId, AMQMethodBody frame);
+
+    public boolean process(AMQMethodEvent evt)
+    {
+        AMQMethodBody method = evt.getMethod();
+
+        return (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method);
+    }
 
     /**
      * Informs this listener that an AMQP method has been received.
@@ -113,37 +104,9 @@
      *
      * @return <tt>true</tt> if this listener has handled the method, <tt>false</tt> otherwise.
      */
-    public boolean methodReceived(AMQMethodEvent evt) // throws AMQException
+    public boolean methodReceived(AMQMethodEvent evt)
     {
-        AMQMethodBody method = evt.getMethod();
-
-        /*try
-        {*/
-        boolean ready = (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method);
-
-        if (ready)
-        {
-            // we only update the flag from inside the synchronized block
-            // so that the blockForFrame method cannot "miss" an update - it
-            // will only ever read the flag from within the synchronized block
-            synchronized (_lock)
-            {
-                _doneEvt = evt;
-                _ready = ready;
-                _lock.notify();
-            }
-        }
-
-        return ready;
-
-        /*}
-        catch (AMQException e)
-        {
-            error(e);
-            // we rethrow the error here, and the code in the frame dispatcher will go round
-            // each listener informing them that an exception has been thrown
-            throw e;
-        }*/
+        return received(evt);
     }
 
     /**
@@ -159,75 +122,15 @@
      */
     public AMQMethodEvent blockForFrame(long timeout) throws AMQException, FailoverException
     {
-        synchronized (_lock)
+        try
         {
-            while (!_ready)
-            {
-                try
-                {
-                    if (timeout == -1)
-                    {
-                        _lock.wait();
-                    }
-                    else
-                    {
-
-                        _lock.wait(timeout);
-                        if (!_ready)
-                        {
-                            _error = new AMQTimeoutException("Server did not respond in a timely fashion", null);
-                            _ready = true;
-                        }
-                    }
-                }
-                catch (InterruptedException e)
-                {
-                    // IGNORE    -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess
-                    // if (!_ready && timeout != -1)
-                    // {
-                    // _error = new AMQException("Server did not respond timely");
-                    // _ready = true;
-                    // }
-                }
-            }
+            return (AMQMethodEvent) block(timeout);
         }
-
-        if (_error != null)
+        finally
         {
-            if (_error instanceof AMQException)
-            {
-                throw (AMQException) _error;
-            }
-            else if (_error instanceof FailoverException)
-            {
-                // This should ensure that FailoverException is not wrapped and can be caught.
-                throw (FailoverException) _error; // needed to expose FailoverException.
-            }
-            else
-            {
-                throw new AMQException(null, "Woken up due to " + _error.getClass(), _error);
-            }
+            //Prevent any more errors being notified to this waiter.
+            close();
         }
-
-        return _doneEvt;
     }
 
-    /**
-     * This is a callback, called by the MINA dispatcher thread only. It is also called from within this
-     * class to avoid code repetition but again is only called by the MINA dispatcher thread.
-     *
-     * @param e
-     */
-    public void error(Exception e)
-    {
-        // set the error so that the thread that is blocking (against blockForFrame())
-        // can pick up the exception and rethrow to the caller
-        _error = e;
-
-        synchronized (_lock)
-        {
-            _ready = true;
-            _lock.notify();
-        }
-    }
 }

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Thu Aug 14 20:40:49 2008
@@ -28,15 +28,28 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Iterator;
 import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 /**
- * The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler
- * there is a separate state manager.
+ * The state manager is responsible for managing the state of the protocol session. <p/>
+ * For each {@link org.apache.qpid.client.protocol.AMQProtocolHandler} there is a separate state manager.
+ *
+ * The AMQStateManager is now attached to the {@link org.apache.qpid.client.protocol.AMQProtocolHandler} and that is the sole point of reference so that
+ * As the {@link AMQProtocolSession} changes due to failover the AMQStateManager need not be copied around.
+ *
+ * The StateManager works by any component can wait for a state change to occur by using the following sequence.
+ *
+ * <li>StateWaiter waiter = stateManager.createWaiter(Set<AMQState> states);
+ * <li> // Perform action that will cause state change
+ * <li>waiter.await();
+ *
+ * The two step process is required as there is an inherit race condition between starting a process that will cause
+ * the state to change and then attempting to wait for that change. The interest in the change must be first set up so
+ * that any asynchrous errors that occur can be delivered to the correct waiters.
  */
-public class AMQStateManager 
+public class AMQStateManager implements AMQMethodListener
 {
     private static final Logger _logger = LoggerFactory.getLogger(AMQStateManager.class);
 
@@ -45,16 +58,13 @@
     /** The current state */
     private AMQState _currentState;
 
-
-    /**
-     * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of
-     * AMQFrame.
-     */
-
-
     private final Object _stateLock = new Object();
+
     private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000"));
 
+    protected final List<StateWaiter> _waiters = new CopyOnWriteArrayList<StateWaiter>();
+    private Exception _lastException;
+
     public AMQStateManager()
     {
         this(null);
@@ -62,18 +72,15 @@
 
     public AMQStateManager(AMQProtocolSession protocolSession)
     {
-        this(AMQState.CONNECTION_NOT_STARTED, true, protocolSession);
+        this(AMQState.CONNECTION_NOT_STARTED, protocolSession);
     }
 
-    protected AMQStateManager(AMQState state, boolean register, AMQProtocolSession protocolSession)
+    protected AMQStateManager(AMQState state, AMQProtocolSession protocolSession)
     {
         _protocolSession = protocolSession;
         _currentState = state;
-
     }
 
-
-
     public AMQState getCurrentState()
     {
         return _currentState;
@@ -86,107 +93,107 @@
         synchronized (_stateLock)
         {
             _currentState = newState;
-            _stateLock.notifyAll();
+
+            _logger.debug("Notififying State change to " + _waiters.size() + " : " + _waiters);
+
+            for (StateWaiter waiter : _waiters)
+            {
+                waiter.received(newState);
+            }
         }
     }
 
-
     public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
     {
-
         B method = evt.getMethod();
-        
+
         //    StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod());
         method.execute(_protocolSession.getMethodDispatcher(), evt.getChannelId());
         return true;
     }
 
-
-    public void attainState(final AMQState s) throws AMQException
+    /**
+     * Setting of the ProtocolSession will be required when Failover has been successfuly compeleted.
+     *
+     * The new {@link AMQProtocolSession} that has been re-established needs to be provided as that is now the
+     * connection to the network.
+     *
+     * @param session The new protocol session
+     */
+    public void setProtocolSession(AMQProtocolSession session)
     {
-        synchronized (_stateLock)
+        if (_logger.isInfoEnabled())
         {
-            final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME;
-            long waitTime = MAXIMUM_STATE_WAIT_TIME;
-
-            while ((_currentState != s) && (waitTime > 0))
-            {
-                try
-                {
-                    _stateLock.wait(MAXIMUM_STATE_WAIT_TIME);
-                }
-                catch (InterruptedException e)
-                {
-                    _logger.warn("Thread interrupted");
-                }
-
-                if (_currentState != s)
-                {
-                    waitTime = waitUntilTime - System.currentTimeMillis();
-                }
-            }
-
-            if (_currentState != s)
-            {
-                _logger.warn("State not achieved within permitted time.  Current state " + _currentState
-                             + ", desired state: " + s);
-                throw new AMQException(null, "State not achieved within permitted time.  Current state " + _currentState
-                                             + ", desired state: " + s, null);
-            }
+            _logger.info("Setting ProtocolSession:" + session);
         }
-
-        // at this point the state will have changed.
+        _protocolSession = session;
     }
 
-    public AMQProtocolSession getProtocolSession()
+    /**
+     * Propogate error to waiters
+     *
+     * @param error The error to propogate.
+     */
+    public void error(Exception error)
     {
-        return _protocolSession;
+        if (_waiters.size() == 0)
+        {
+            _logger.error("No Waiters for error saving as last error:" + error.getMessage());
+            _lastException = error;
+        }
+        for (StateWaiter waiter : _waiters)
+        {
+            _logger.error("Notifying Waiters(" + _waiters + ") for error:" + error.getMessage());
+            waiter.error(error);
+        }
     }
 
-    public void setProtocolSession(AMQProtocolSession session)
+    /**
+     * This provides a single place that the maximum time for state change to occur can be accessed.
+     * It is currently set via System property amqj.MaximumStateWait
+     *
+     * @return long Milliseconds value for a timeout
+     */
+    public long getWaitTimeout()
     {
-        _protocolSession = session;
+        return MAXIMUM_STATE_WAIT_TIME;
     }
 
-    public MethodRegistry getMethodRegistry()
+    /**
+     * Create and add a new waiter to the notifcation list.
+     *
+     * @param states The waiter will attempt to wait for one of these desired set states to be achived.
+     *
+     * @return the created StateWaiter.
+     */
+    public StateWaiter createWaiter(Set<AMQState> states)
     {
-        return getProtocolSession().getMethodRegistry();
+        final StateWaiter waiter;
+        synchronized (_stateLock)
+        {
+            waiter = new StateWaiter(this, _currentState, states);
+
+            _waiters.add(waiter);
+        }
+
+        return waiter;
     }
 
-    public AMQState attainState(Set<AMQState> stateSet) throws AMQException
+    /**
+     * Remove the waiter from the notification list.
+     *
+     * @param waiter The waiter to remove.
+     */
+    public void removeWaiter(StateWaiter waiter)
     {
         synchronized (_stateLock)
         {
-            final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME;
-            long waitTime = MAXIMUM_STATE_WAIT_TIME;
-
-            while (!stateSet.contains(_currentState) && (waitTime > 0))
-            {
-                try
-                {
-                    _stateLock.wait(MAXIMUM_STATE_WAIT_TIME);
-                }
-                catch (InterruptedException e)
-                {
-                    _logger.warn("Thread interrupted");
-                }
-
-                if (!stateSet.contains(_currentState))
-                {
-                    waitTime = waitUntilTime - System.currentTimeMillis();
-                }
-            }
-
-            if (!stateSet.contains(_currentState))
-            {
-                _logger.warn("State not achieved within permitted time.  Current state " + _currentState
-                             + ", desired state: " + stateSet);
-                throw new AMQException(null, "State not achieved within permitted time.  Current state " + _currentState
-                                       + ", desired state: " + stateSet, null);
-            }
-            return _currentState;
+            _waiters.remove(waiter);
         }
+    }
 
-
+    public Exception getLastException()
+    {
+        return _lastException;
     }
 }

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java Thu Aug 14 20:40:49 2008
@@ -33,6 +33,6 @@
 public interface StateAwareMethodListener<B extends AMQMethodBody>
 {
 
-    void methodReceived(AMQStateManager stateManager, B body, int channelId) throws AMQException;
+    void methodReceived(AMQProtocolSession session, B body, int channelId) throws AMQException;
 
 }

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java Thu Aug 14 20:40:49 2008
@@ -20,103 +20,110 @@
  */
 package org.apache.qpid.client.state;
 
+import org.apache.qpid.client.util.BlockingWaiter;
+import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.AMQException;
-
-import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+import java.util.Set;
 
 /**
- * Waits for a particular state to be reached.
+ * This is an implementation of the {@link BlockingWaiter} to provide error handing and a waiting mechanism for state
+ * changes.
+ *
+ * On construction the current state and a set of States to await for is provided.
+ *
+ * When await() is called the state at constuction is compared against the awaitStates. If the state at construction is
+ * a desired state then await() returns immediately.
+ *
+ * Otherwise it will block for the set timeout for a desired state to be achieved.
+ *
+ * The state changes are notified via the {@link #process} method.
+ *
+ * Any notified error is handled by the BlockingWaiter and thrown from the {@link #block} method.
+ *
  */
-public class StateWaiter implements StateListener
+public class StateWaiter extends BlockingWaiter<AMQState>
 {
     private static final Logger _logger = LoggerFactory.getLogger(StateWaiter.class);
 
-    private final AMQState _state;
-
-    private volatile boolean _newStateAchieved;
-
-    private volatile Throwable _throwable;
-
-    private final Object _monitor = new Object();
-    private static final long TIME_OUT = 1000 * 60 * 2;
-
-    public StateWaiter(AMQState state)
+    Set<AMQState> _awaitStates;
+    private AMQState _startState;
+    private AMQStateManager _stateManager;
+
+    /**
+     *
+     * @param stateManager The StateManager
+     * @param currentState
+     * @param awaitStates
+     */
+    public StateWaiter(AMQStateManager stateManager, AMQState currentState, Set<AMQState> awaitStates)
     {
-        _state = state;
+        _logger.info("New StateWaiter :" + currentState + ":" + awaitStates);
+        _stateManager = stateManager;
+        _awaitStates = awaitStates;
+        _startState = currentState;
     }
 
-    public void waituntilStateHasChanged() throws AMQException
+    /**
+     * When the state is changed this StateWaiter is notified to process the change.
+     *
+     * @param state The new state that has been achieved.
+     * @return
+     */
+    public boolean process(AMQState state)
     {
-        synchronized (_monitor)
-        {
-            //
-            // The guard is required in case we are woken up by a spurious
-            // notify().
-            //
-            while (!_newStateAchieved && (_throwable == null))
-            {
-                try
-                {
-                    _logger.debug("State " + _state + " not achieved so waiting...");
-                    _monitor.wait(TIME_OUT);
-                    // fixme this won't cause the timeout to exit the loop. need to set _throwable
-                }
-                catch (InterruptedException e)
-                {
-                    _logger.debug("Interrupted exception caught while waiting: " + e, e);
-                }
-            }
-        }
+        return _awaitStates.contains(state);
+    }
 
-        if (_throwable != null)
-        {
-            _logger.debug("Throwable reached state waiter: " + _throwable);
-            if (_throwable instanceof AMQException)
-            {
-                throw (AMQException) _throwable;
-            }
-            else
-            {
-                throw new AMQException(null, "Error: " + _throwable, _throwable); // FIXME: this will wrap FailoverException in throwable which will prevent it being caught.
-            }
-        }
+    /**
+     * Await for the requried State to be achieved within the default timeout.
+     * @return The achieved state that was requested.
+     * @throws AMQException The exception that prevented the required state from being achived.
+     */
+    public AMQState await() throws AMQException
+    {
+        return await(_stateManager.getWaitTimeout());
     }
 
-    public void stateChanged(AMQState oldState, AMQState newState)
+    /**
+     * Await for the requried State to be achieved.
+     *
+     * <b>It is the responsibility of this class to remove the waiter from the StateManager
+     *
+     * @param timeout The time in milliseconds to wait for any of the states to be achived.
+     * @return The achieved state that was requested.
+     * @throws AMQException The exception that prevented the required state from being achived.
+     */
+    public AMQState await(long timeout) throws AMQException
     {
-        synchronized (_monitor)
+        try
         {
-            if (_logger.isDebugEnabled())
+            if (process(_startState))
             {
-                _logger.debug("stateChanged called changing from :" + oldState + " to :" + newState);
+                return _startState;
             }
 
-            if (_state == newState)
+            try
             {
-                _newStateAchieved = true;
-
-                if (_logger.isDebugEnabled())
-                {
-                    _logger.debug("New state reached so notifying monitor");
-                }
+                return (AMQState) block(timeout);
+            }
+            catch (FailoverException e)
+            {
+                _logger.error("Failover occured whilst waiting for states:" + _awaitStates);
 
-                _monitor.notifyAll();
+                e.printStackTrace();
+                return null;
             }
         }
-    }
-
-    public void error(Throwable t)
-    {
-        synchronized (_monitor)
+        finally
         {
-            if (_logger.isDebugEnabled())
-            {
-                _logger.debug("exceptionThrown called");
-            }
+            //Prevent any more errors being notified to this waiter.
+            close();
 
-            _throwable = t;
-            _monitor.notifyAll();
+            //Remove the waiter from the notifcation list in the statee manager
+            _stateManager.removeWaiter(this);
         }
     }
 }

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java Thu Aug 14 20:40:49 2008
@@ -20,7 +20,7 @@
  */
 package org.apache.qpid.client.state.listener;
 
-import org.apache.qpid.AMQException;
+
 import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
 import org.apache.qpid.framing.AMQMethodBody;
 
@@ -34,7 +34,7 @@
         _expectedClass = expectedClass;
     }
 
-    public boolean processMethod(int channelId, AMQMethodBody frame) //throws AMQException
+    public boolean processMethod(int channelId, AMQMethodBody frame)
     {
         return _expectedClass.isInstance(frame);
     }