You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by to...@apache.org on 2007/05/18 02:51:15 UTC

svn commit: r539198 [2/2] - in /incubator/qpid/trunk/qpid: ./ dotnet/Qpid.Buffer.Tests/ dotnet/Qpid.Buffer/ dotnet/Qpid.Client.Tests/ dotnet/Qpid.Client.Tests/BrokerDetails/ dotnet/Qpid.Client.Tests/Channel/ dotnet/Qpid.Client.Tests/Messages/ dotnet/Qp...

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs Thu May 17 17:51:12 2007
@@ -25,93 +25,104 @@
 
 namespace Qpid.Client.Message
 {
-    public class MessageFactoryRegistry
-    {
-        private readonly Hashtable _mimeToFactoryMap = new Hashtable();
-
-        public void RegisterFactory(string mimeType, IMessageFactory mf)
-        {    
-            if (mf == null)
-            {
-                throw new ArgumentNullException("Message factory");
-            }
-            if (mimeType == null)
-            {
-                throw new ArgumentNullException("mf");
-            }
-            _mimeToFactoryMap[mimeType] = mf;
-        }
-
-        public void DeregisterFactory(string mimeType)
-        {
-            _mimeToFactoryMap.Remove(mimeType);
-        }
-
-        /// <summary>
-        /// Create a message. This looks up the MIME type from the content header and instantiates the appropriate
-        /// concrete message type.
-        /// </summary>
-        /// <param name="messageNbr">the AMQ message id</param>
-        /// <param name="redelivered">true if redelivered</param>
-        /// <param name="contentHeader">the content header that was received</param>
-        /// <param name="bodies">a list of ContentBody instances</param>
-        /// <returns>the message.</returns>
-        /// <exception cref="AMQException"/>
-        /// <exception cref="QpidException"/>
-        public AbstractQmsMessage CreateMessage(long messageNbr, bool redelivered,
-                                                ContentHeaderBody contentHeader,
-                                                IList bodies)
-        {
-            BasicContentHeaderProperties properties =  (BasicContentHeaderProperties) contentHeader.Properties;
-
-            if (properties.ContentType == null)
-            {
-                properties.ContentType = "";
-            }
-
-            IMessageFactory mf = (IMessageFactory) _mimeToFactoryMap[properties.ContentType];
-            if (mf == null)
-            {
-                throw new AMQException("Unsupport MIME type of " + properties.ContentType);
-            }
-            else
-            {
-                return mf.CreateMessage(messageNbr, redelivered, contentHeader, bodies);
-            }
-        }
-
-        public AbstractQmsMessage CreateMessage(string mimeType)
-        {
-            if (mimeType == null)
-            {
-                throw new ArgumentNullException("Mime type must not be null");
-            }
-            IMessageFactory mf = (IMessageFactory) _mimeToFactoryMap[mimeType];
-            if (mf == null)
-            {
-                throw new AMQException("Unsupport MIME type of " + mimeType);
-            }
-            else
-            {
-                return mf.CreateMessage();
-            }
-        }
-
-        /// <summary>
-        /// Construct a new registry with the default message factories registered
-        /// </summary>
-        /// <returns>a message factory registry</returns>
-        public static MessageFactoryRegistry NewDefaultRegistry()
-        {
-            MessageFactoryRegistry mf = new MessageFactoryRegistry();
-            mf.RegisterFactory("text/plain", new QpidTextMessageFactory());
-            mf.RegisterFactory("text/xml", new QpidTextMessageFactory());
-            mf.RegisterFactory("application/octet-stream", new QpidBytesMessageFactory());
-            // TODO: use bytes message for default message factory            
-            // MJA - just added this bit back in...
-            mf.RegisterFactory("", new QpidBytesMessageFactory());
-            return mf;
-        }
-    }
+   public class MessageFactoryRegistry
+   {
+      private readonly Hashtable _mimeToFactoryMap = new Hashtable();
+      private IMessageFactory _defaultFactory;
+
+      /// <summary>
+      /// Default factory to use for unknown message types
+      /// </summary>
+      public IMessageFactory DefaultFactory
+      {
+         get { return _defaultFactory; }
+         set { _defaultFactory = value; }
+      }
+
+      /// <summary>
+      /// Register a new message factory for a MIME type
+      /// </summary>
+      /// <param name="mimeType">Mime type to register</param>
+      /// <param name="mf"></param>
+      public void RegisterFactory(string mimeType, IMessageFactory mf)
+      {
+         if ( mf == null )
+            throw new ArgumentNullException("mf");
+         if ( mimeType == null || mimeType.Length == 0 )
+            throw new ArgumentNullException("mimeType");
+
+         _mimeToFactoryMap[mimeType] = mf;
+      }
+
+      /// <summary>
+      /// Remove a message factory
+      /// </summary>
+      /// <param name="mimeType">MIME type to unregister</param>
+      public void DeregisterFactory(string mimeType)
+      {
+         _mimeToFactoryMap.Remove(mimeType);
+      }
+
+      /// <summary>
+      /// Create a message. This looks up the MIME type from the content header and instantiates the appropriate
+      /// concrete message type.
+      /// </summary>
+      /// <param name="messageNbr">the AMQ message id</param>
+      /// <param name="redelivered">true if redelivered</param>
+      /// <param name="contentHeader">the content header that was received</param>
+      /// <param name="bodies">a list of ContentBody instances</param>
+      /// <returns>the message.</returns>
+      /// <exception cref="AMQException"/>
+      /// <exception cref="QpidException"/>
+      public AbstractQmsMessage CreateMessage(long messageNbr, bool redelivered,
+                                              ContentHeaderBody contentHeader,
+                                              IList bodies)
+      {
+         BasicContentHeaderProperties properties = (BasicContentHeaderProperties)contentHeader.Properties;
+
+         if ( properties.ContentType == null )
+         {
+            properties.ContentType = "";
+         }
+
+         IMessageFactory mf = GetFactory(properties.ContentType);
+         return mf.CreateMessage(messageNbr, redelivered, contentHeader, bodies);
+      }
+
+      /// <summary>
+      /// Create a new message of the specified type
+      /// </summary>
+      /// <param name="mimeType">The Mime type</param>
+      /// <returns>The new message</returns>
+      public AbstractQmsMessage CreateMessage(string mimeType)
+      {
+         if ( mimeType == null || mimeType.Length == 0 )
+            throw new ArgumentNullException("mimeType");
+
+         IMessageFactory mf = GetFactory(mimeType);
+         return mf.CreateMessage(mimeType);
+      }
+
+      /// <summary>
+      /// Construct a new registry with the default message factories registered
+      /// </summary>
+      /// <returns>a message factory registry</returns>
+      public static MessageFactoryRegistry NewDefaultRegistry()
+      {
+         MessageFactoryRegistry mf = new MessageFactoryRegistry();
+         mf.RegisterFactory("text/plain", new QpidTextMessageFactory());
+         mf.RegisterFactory("text/xml", new QpidTextMessageFactory());
+         mf.RegisterFactory("application/octet-stream", new QpidBytesMessageFactory());
+
+         mf.DefaultFactory = new QpidBytesMessageFactory();
+         return mf;
+      }
+
+      private IMessageFactory GetFactory(string mimeType)
+      {
+         IMessageFactory mf = (IMessageFactory)_mimeToFactoryMap[mimeType];
+         return mf != null ? mf : _defaultFactory;
+      }
+   }
 }
 

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs Thu May 17 17:51:12 2007
@@ -43,8 +43,6 @@
 
     public class QpidBytesMessage : AbstractQmsMessage, IBytesMessage
     {
-        private const string MIME_TYPE = "application/octet-stream";
-
         private const int DEFAULT_BUFFER_INITIAL_SIZE = 1024;
 
         public QpidBytesMessage() : this(null)
@@ -59,7 +57,6 @@
         QpidBytesMessage(ByteBuffer data) : base(data)
         {
             // superclass constructor has instantiated a content header at this point
-            ContentHeaderProperties.ContentType = MIME_TYPE;
             if (data == null)
             {
                 _data = ByteBuffer.Allocate(DEFAULT_BUFFER_INITIAL_SIZE);
@@ -71,7 +68,6 @@
             // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
             : base(messageNbr, (BasicContentHeaderProperties)contentHeader.Properties, data)
         {
-            ContentHeaderProperties.ContentType = MIME_TYPE;        
         }
 
         public override void ClearBodyImpl()
@@ -113,14 +109,6 @@
                 byte[] data = new byte[_data.Remaining];
                 _data.GetBytes(data);
                 return Encoding.UTF8.GetString(data);
-            }
-        }
-
-        public override string MimeType
-        {
-            get
-            {
-                return MIME_TYPE;
             }
         }
 

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs Thu May 17 17:51:12 2007
@@ -62,9 +62,11 @@
             return new QpidBytesMessage(deliveryTag, contentHeader, data);
         }
 
-        public override AbstractQmsMessage CreateMessage()
+        public override AbstractQmsMessage CreateMessage(string mimeType)
         {
-            return new QpidBytesMessage();
+            QpidBytesMessage msg = new QpidBytesMessage();
+            msg.ContentType = mimeType;
+            return msg;
         }
 
     }

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs Thu May 17 17:51:12 2007
@@ -28,16 +28,10 @@
          _headers.Clear();
       }
 
-      public string this[string name]
+      public object this[string name]
       {
-         get
-         {
-            return GetString(name);
-         }
-         set
-         {
-            SetString(name, value);
-         }
+         get { return GetObject(name); }
+         set { SetObject(name, value); }
       }
 
       public bool GetBoolean(string name)
@@ -165,6 +159,18 @@
       {
          CheckPropertyName(propertyName);
          _headers.SetString(propertyName, value);
+      }
+
+      public object GetObject(string propertyName)
+      {
+         CheckPropertyName(propertyName);
+         return _headers[propertyName];
+      }
+
+      public void SetObject(string propertyName, object value)
+      {
+         CheckPropertyName(propertyName);
+         _headers[propertyName] = value;
       }
 
       private static void CheckPropertyName(string propertyName)

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs Thu May 17 17:51:12 2007
@@ -28,25 +28,22 @@
 {
     public class QpidTextMessage : AbstractQmsMessage, ITextMessage
     {
-        private const string MIME_TYPE = "text/plain";
-
         private string _decodedValue = null;
+        private static Encoding DEFAULT_ENCODING = Encoding.UTF8;
 
         internal QpidTextMessage() : this(null, null)
         {
+           ContentEncoding = DEFAULT_ENCODING.BodyName;
         }
 
-        QpidTextMessage(ByteBuffer data, String encoding) : base(data)
+        internal QpidTextMessage(ByteBuffer data, String encoding) : base(data)
         {
-            ContentHeaderProperties.ContentType = MIME_TYPE;
-            ContentHeaderProperties.Encoding = encoding;
+            ContentEncoding = encoding;
         }
 
         internal QpidTextMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data)
             :base(deliveryTag, contentHeader, data)
         {
-            contentHeader.ContentType = MIME_TYPE;
-            _data = data; // FIXME: Unnecessary - done in base class ctor.
         }
 
         public override void ClearBodyImpl()
@@ -64,14 +61,6 @@
             return Text;
         }
 
-        public override string MimeType
-        {
-            get
-            {
-                return MIME_TYPE;
-            }
-        }        
-
         public string Text
         {
             get
@@ -100,7 +89,7 @@
                     }
                     else
                     {
-                        _decodedValue = Encoding.Default.GetString(bytes);
+                        _decodedValue = DEFAULT_ENCODING.GetString(bytes);
                     }
                     return _decodedValue;                    
                 }

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs Thu May 17 17:51:12 2007
@@ -25,9 +25,11 @@
 {
     public class QpidTextMessageFactory : AbstractQmsMessageFactory
     {        
-        public override AbstractQmsMessage CreateMessage()
+        public override AbstractQmsMessage CreateMessage(string mimeType)
         {
-            return new QpidTextMessage();
+            QpidTextMessage msg = new QpidTextMessage();
+            msg.ContentType = mimeType;
+            return msg;
         }
 
         protected override AbstractQmsMessage CreateMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader)

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs Thu May 17 17:51:12 2007
@@ -43,7 +43,7 @@
             Bodies.Add(body);
             if (body.Payload != null)
             {
-                _bytesReceived += (uint)body.Payload.Length;
+                _bytesReceived += (uint)body.Payload.Remaining;
             }
         }
 

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/QpidConnectionInfo.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/QpidConnectionInfo.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/QpidConnectionInfo.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/QpidConnectionInfo.cs Thu May 17 17:51:12 2007
@@ -171,19 +171,10 @@
                 sb.Append('?');
                 foreach (String key in options.Keys)
                 {
-                    sb.Append(key);
-
-                    sb.Append("='");
-
-                    sb.Append(options[key]);
-
-                    sb.Append("'");
-                    sb.Append(DEFAULT_OPTION_SEPERATOR);
+                    sb.AppendFormat("{0}='{1}'{2}", key, options[key], DEFAULT_OPTION_SEPERATOR);
                 }
 
                 sb.Remove(sb.Length - 1, 1);
-                //                sb.deleteCharAt(sb.length() - 1);
-
                 return sb.ToString();
             }
         }
@@ -358,9 +349,10 @@
 
     public class QpidConnectionInfo : IConnectionInfo
     {
+        const string DEFAULT_VHOST = "/";
         string _username = "guest";
         string _password = "guest";
-        string _virtualHost = "/";
+        string _virtualHost = DEFAULT_VHOST;
 
         string _failoverMethod = null;
         IDictionary _failoverOptions = new Hashtable();
@@ -385,15 +377,51 @@
 
         public string AsUrl()
         {
-            string result = "amqp://";
-            foreach (IBrokerInfo info in _brokerInfos)
+            StringBuilder sb = new StringBuilder();
+            sb.AppendFormat("{0}://", ConnectionUrlConstants.AMQ_PROTOCOL);
+
+            if (_username != null)
             {
-                result += info.ToString();
+                sb.Append(_username);
+                if (_password != null)
+                {
+                    sb.AppendFormat(":{0}", _password);
+                }
+                sb.Append("@");
             }
-            return result;
 
+            sb.Append(_clientName);
+            sb.Append(_virtualHost);
+            sb.Append(OptionsToString());
+
+            return sb.ToString();
         }
 
+        private String OptionsToString()
+        {
+            StringBuilder sb = new StringBuilder();
+            sb.AppendFormat("?{0}='", ConnectionUrlConstants.OPTIONS_BROKERLIST);
+
+            foreach (IBrokerInfo broker in _brokerInfos)
+            {
+                sb.AppendFormat("{0};", broker);
+            }
+
+            sb.Remove(sb.Length - 1, 1);
+            sb.Append("'");
+
+            if (_failoverMethod != null)
+            {
+                sb.AppendFormat("{0}{1}='{2}{3}'", URLHelper.DEFAULT_OPTION_SEPERATOR,
+                    ConnectionUrlConstants.OPTIONS_FAILOVER,
+                    _failoverMethod,
+                    URLHelper.printOptions((Hashtable)_failoverOptions));
+            }
+
+            return sb.ToString();
+        }
+
+
         public string FailoverMethod
         {
             get { return _failoverMethod; }
@@ -449,7 +477,13 @@
         public string VirtualHost
         {
             get { return _virtualHost; }
-            set { _virtualHost = value; }
+            set { 
+               _virtualHost = value;
+               if ( _virtualHost == null || _virtualHost.Length == 0 )
+                  _virtualHost = DEFAULT_VHOST;
+               if ( _virtualHost[0] != '/' )
+                  _virtualHost = '/' + _virtualHost;
+            }
         }
 
         public string GetOption(string key)

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs Thu May 17 17:51:12 2007
@@ -65,6 +65,8 @@
             IStateAwareMethodListener channelClose = new ChannelCloseMethodHandler();
             IStateAwareMethodListener basicDeliver = new BasicDeliverMethodHandler();
             IStateAwareMethodListener basicReturn = new BasicReturnMethodHandler();
+            IStateAwareMethodListener queueDeleteOk = new QueueDeleteOkMethodHandler();
+            IStateAwareMethodListener queuePurgeOk = new QueuePurgeOkMethodHandler();
             
             // We need to register a map for the null (i.e. all state) handlers otherwise you get
             // a stack overflow in the handler searching code when you present it with a frame for which
@@ -96,6 +98,8 @@
                 open[typeof(ConnectionCloseBody)] = connectionClose;
                 open[typeof(BasicDeliverBody)] = basicDeliver;
                 open[typeof(BasicReturnBody)] = basicReturn;
+                open[typeof(QueueDeleteOkBody)] = queueDeleteOk;
+                open[typeof(QueuePurgeOkBody)] = queuePurgeOk;
                 _state2HandlersMap[AMQState.CONNECTION_OPEN] = open;
             }
             {

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs Thu May 17 17:51:12 2007
@@ -33,35 +33,39 @@
         // Warning: don't use this log for regular logging.
         static readonly ILog _protocolTraceLog = LogManager.GetLogger("Qpid.Client.ProtocolChannel.Tracing");
         
-        IByteChannel byteChannel;
-        IProtocolEncoder encoder;
-        IProtocolDecoder decoder;
-
-        public AmqpChannel(IByteChannel byteChannel)
-        {
-            this.byteChannel = byteChannel;    
+        IByteChannel _byteChannel;
+        IProtocolEncoder _encoder;
+        IProtocolDecoder _decoder;
+        IProtocolDecoderOutput _decoderOutput;
+        private object _syncLock;
+
+        public AmqpChannel(IByteChannel byteChannel, IProtocolDecoderOutput decoderOutput)
+        {
+            _byteChannel = byteChannel;
+            _decoderOutput = decoderOutput;
+            _syncLock = new object();
             
             AMQProtocolProvider protocolProvider = new AMQProtocolProvider();
             IProtocolCodecFactory factory = protocolProvider.CodecFactory;
-            encoder = factory.Encoder;
-            decoder = factory.Decoder;
+            _encoder = factory.Encoder;
+            _decoder = factory.Decoder;
         }
 
-        public Queue Read()
+        public void Read()
         {
-            ByteBuffer buffer = byteChannel.Read();
-            return DecodeAndTrace(buffer);
+            ByteBuffer buffer = _byteChannel.Read();
+            Decode(buffer);
         }
         
         public IAsyncResult BeginRead(AsyncCallback callback, object state)
         {
-           return byteChannel.BeginRead(callback, state);
+           return _byteChannel.BeginRead(callback, state);
         }
 
-        public Queue EndRead(IAsyncResult result)
+        public void EndRead(IAsyncResult result)
         {
-           ByteBuffer buffer = byteChannel.EndRead(result);
-           return DecodeAndTrace(buffer);
+           ByteBuffer buffer = _byteChannel.EndRead(result);
+           Decode(buffer);
         }
 
         public void Write(IDataBlock o)
@@ -74,43 +78,32 @@
             // we should be doing an async write, but apparently
             // the mentalis library doesn't queue async read/writes
             // correctly and throws random IOException's. Stay sync for a while
-            //byteChannel.BeginWrite(Encode(o), OnAsyncWriteDone, null);
-            byteChannel.Write(Encode(o));
+            //_byteChannel.BeginWrite(Encode(o), OnAsyncWriteDone, null);
+            _byteChannel.Write(Encode(o));
         }
 
         private void OnAsyncWriteDone(IAsyncResult result)
         {
-           byteChannel.EndWrite(result);
+           _byteChannel.EndWrite(result);
         }
 
-        private Queue DecodeAndTrace(ByteBuffer buffer)
+        private void Decode(ByteBuffer buffer)
         {
-           Queue frames = Decode(buffer);
-
-           // TODO: Refactor to decorator.
-           if ( _protocolTraceLog.IsDebugEnabled )
+           // make sure we don't try to decode more than
+           // one buffer at the same time
+           lock ( _syncLock )
            {
-              foreach ( object o in frames )
-              {
-                 _protocolTraceLog.Debug(String.Format("READ {0}", o));
-              }
+              _decoder.Decode(buffer, _decoderOutput);
            }
-           return frames;
         }
 
         private ByteBuffer Encode(object o)
         {
             SingleProtocolEncoderOutput output = new SingleProtocolEncoderOutput();
-            encoder.Encode(o, output);
+            _encoder.Encode(o, output);
             return output.buffer;
         }
 
-        private Queue Decode(ByteBuffer byteBuffer)
-        {
-            SimpleProtocolDecoderOutput outx = new SimpleProtocolDecoderOutput();
-            decoder.Decode(byteBuffer, outx);
-            return outx.MessageQueue;
-        }
     }
 }
 

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs Thu May 17 17:51:12 2007
@@ -25,8 +25,8 @@
 {
     public interface IProtocolChannel : IProtocolWriter
     {
-        Queue Read();
+        void Read();
         IAsyncResult BeginRead(AsyncCallback callback, object state);
-        Queue EndRead(IAsyncResult result);
+        void EndRead(IAsyncResult result);
     }
 }

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs Thu May 17 17:51:12 2007
@@ -24,6 +24,7 @@
 using System.Threading;
 using Qpid.Client.Qms;
 using Qpid.Client.Protocol;
+using Qpid.Codec;
 using Qpid.Framing;
 
 namespace Qpid.Client.Transport.Socket.Blocking
@@ -66,7 +67,11 @@
          _ioHandler = MakeBrokerConnection(broker, connection);
          // todo: get default read size from config!
 
-         _amqpChannel = new AmqpChannel(new ByteChannel(_ioHandler));
+         IProtocolDecoderOutput decoderOutput =
+            new ProtocolDecoderOutput(_protocolListener);
+         _amqpChannel = 
+            new AmqpChannel(new ByteChannel(_ioHandler), decoderOutput);
+
          // post an initial async read
          _amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), this);
       }
@@ -117,22 +122,28 @@
       {
          try
          {
-            Queue frames = _amqpChannel.EndRead(result);
+            _amqpChannel.EndRead(result);
 
-            // process results
-            foreach ( IDataBlock dataBlock in frames )
-            {
-               _protocolListener.OnMessage(dataBlock);
-            }
-            // if we're not stopping, post a read again
             bool stopping = _stopEvent.WaitOne(0, false);
             if ( !stopping )
                _amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), null);
          } catch ( Exception e )
          {
-            _protocolListener.OnException(e);
+            // ignore any errors during closing
+            bool stopping = _stopEvent.WaitOne(0, false);
+            if ( !stopping )
+               _protocolListener.OnException(e);
          }
       }
+
+      #region IProtocolDecoderOutput Members
+
+      public void Write(object message)
+      {
+         _protocolListener.OnMessage((IDataBlock)message);
+      }
+
+      #endregion
    }
 }
 

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Qpid.Client.csproj
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Qpid.Client.csproj?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Qpid.Client.csproj (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Qpid.Client.csproj Thu May 17 17:51:12 2007
@@ -50,6 +50,8 @@
     <Compile Include="Client\AMQNoConsumersException.cs" />
     <Compile Include="Client\AMQNoRouteException.cs" />
     <Compile Include="Client\Configuration\AuthenticationConfigurationSectionHandler.cs" />
+    <Compile Include="Client\Handler\QueueDeleteOkMethodHandler.cs" />
+    <Compile Include="Client\Handler\QueuePurgeOkMethodHandler.cs" />
     <Compile Include="Client\SslOptions.cs" />
     <Compile Include="Client\Message\QpidHeaders.cs" />
     <Compile Include="Client\QpidConnectionInfo.cs" />
@@ -110,6 +112,7 @@
     <Compile Include="Client\Transport\IProtocolChannel.cs" />
     <Compile Include="Client\Transport\IProtocolWriter.cs" />
     <Compile Include="Client\Transport\ITransport.cs" />
+    <Compile Include="Client\Transport\ProtocolDecoderOutput.cs" />
     <Compile Include="Client\Transport\SingleProtocolEncoderOutput.cs" />
     <Compile Include="Client\Transport\Socket\Blocking\BlockingSocketTransport.cs" />
     <Compile Include="Client\Transport\Socket\Blocking\ByteChannel.cs" />

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Codec/CumulativeProtocolDecoder.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Codec/CumulativeProtocolDecoder.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Codec/CumulativeProtocolDecoder.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Codec/CumulativeProtocolDecoder.cs Thu May 17 17:51:12 2007
@@ -19,121 +19,134 @@
  *
  */
 using System;
+using log4net;
 using Qpid.Buffer;
 
 namespace Qpid.Codec
 {
-    public abstract class CumulativeProtocolDecoder : IProtocolDecoder
-    {
-        ByteBuffer _remaining;
-
-        /// <summary>
-        /// Creates a new instance with the 4096 bytes initial capacity of
-        /// cumulative buffer.
-        /// </summary>
-        protected CumulativeProtocolDecoder()
-        {
-            _remaining = ByteBuffer.Allocate(4096);
-            _remaining.IsAutoExpand = true;
-        }
-
-        /// <summary>
-        /// Cumulates content of <tt>in</tt> into internal buffer and forwards
-        /// decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
-        /// <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
-        /// and the cumulative buffer is compacted after decoding ends.
-        /// </summary>
-        /// <exception cref="Exception">
-        /// if your <tt>doDecode()</tt> returned <tt>true</tt> not consuming the cumulative buffer.
-        /// </exception>
-        public void Decode(ByteBuffer input, IProtocolDecoderOutput output)
-        {
-            if (_remaining.Position != 0) // If there were remaining undecoded bytes
-            {
-                DecodeRemainingAndInput(input, output);
-            }
-            else
-            {
-                DecodeInput(input, output);
-            }
-        }
-
-        private void DecodeInput(ByteBuffer input, IProtocolDecoderOutput output)
-        {
-            // Just decode the input buffer and remember any remaining undecoded bytes.
-            try
-            {
-                DecodeAll(input, output);
-            }
-            finally
-            {
-                if (input.HasRemaining)
-                {
-                    _remaining.Put(input);
-                }
-            }
-        }
-
-        private void DecodeRemainingAndInput(ByteBuffer input, IProtocolDecoderOutput output)
-        {
-            // Concatenate input buffer with left-over bytes.
-            _remaining.Put(input);
-            _remaining.Flip();
-
-            try
-            {
-                DecodeAll(_remaining, output);
-            }
-            finally
-            {
-                _remaining.Compact();
-            }
-        }
-
-        private void DecodeAll(ByteBuffer buf, IProtocolDecoderOutput output)
-        {
-            for (;;)
-            {
-                int oldPos = buf.Position;
-                bool decoded = DoDecode(buf, output);
-                if (decoded)
-                {                        
-                    if (buf.Position == oldPos)
-                    {
-                        throw new Exception(
-                            "doDecode() can't return true when buffer is not consumed.");
-                    }
-
-                    if (!buf.HasRemaining)
-                    {
-                        break;
-                    }
-                }
-                else
-                {
-                    break;
-                }
-            }
-        }
-
-        /// <summary>
-        /// Implement this method to consume the specified cumulative buffer and
-        /// decode its content into message(s). 
-        /// </summary>
-        /// <param name="input">the cumulative buffer</param>
-        /// <param name="output">decoder output</param>
-        /// <returns>
-        /// <tt>true</tt> if and only if there's more to decode in the buffer
-        /// and you want to have <tt>doDecode</tt> method invoked again.
-        /// Return <tt>false</tt> if remaining data is not enough to decode,
-        /// then this method will be invoked again when more data is cumulated.
-        /// </returns>
-        /// <exception cref="Exception">If cannot decode</exception>
-        protected abstract bool DoDecode(ByteBuffer input, IProtocolDecoderOutput output);
-
-        public void Dispose()
-        {
-            _remaining = null;
-        }
-    }
+   public abstract class CumulativeProtocolDecoder : IProtocolDecoder
+   {
+      static ILog _logger = LogManager.GetLogger(typeof(CumulativeProtocolDecoder));
+
+      ByteBuffer _remaining;
+
+      /// <summary>
+      /// Creates a new instance with the 4096 bytes initial capacity of
+      /// cumulative buffer.
+      /// </summary>
+      protected CumulativeProtocolDecoder()
+      {
+         _remaining = AllocateBuffer();
+      }
+
+      /// <summary>
+      /// Cumulates content of <tt>in</tt> into internal buffer and forwards
+      /// decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
+      /// <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
+      /// and the cumulative buffer is compacted after decoding ends.
+      /// </summary>
+      /// <exception cref="Exception">
+      /// if your <tt>doDecode()</tt> returned <tt>true</tt> not consuming the cumulative buffer.
+      /// </exception>
+      public void Decode(ByteBuffer input, IProtocolDecoderOutput output)
+      {
+         if ( _remaining.Position != 0 ) // If there were remaining undecoded bytes
+         {
+            DecodeRemainingAndInput(input, output);
+         } else
+         {
+            DecodeInput(input, output);
+         }
+      }
+
+      private void DecodeInput(ByteBuffer input, IProtocolDecoderOutput output)
+      {
+         _logger.Debug(string.Format("DecodeInput: input {0}", input.Remaining));
+         // Just decode the input buffer and remember any remaining undecoded bytes.
+         try
+         {
+            DecodeAll(input, output);
+         } finally
+         {
+            if ( input.HasRemaining )
+            {
+               _remaining.Put(input);
+            }
+         }
+      }
+
+      private void DecodeRemainingAndInput(ByteBuffer input, IProtocolDecoderOutput output)
+      {
+         _logger.Debug(string.Format("DecodeRemainingAndInput: input {0}, remaining {1}", input.Remaining, _remaining.Position));
+         // replace the _remainder buffer, so that we can leave the 
+         // original one alone. Necessary because some consumer splice
+         // the buffer and only consume it until later, causing
+         // a race condition if we compact it too soon.
+         ByteBuffer newRemainding = AllocateBuffer();
+         ByteBuffer temp = _remaining;
+         _remaining = newRemainding;
+         temp.Put(input);
+         temp.Flip();
+         try
+         {
+            DecodeAll(temp, output);
+         } finally
+         {
+            if ( temp.Remaining > 0 )
+               _remaining.Put(temp);
+         }
+      }
+
+      private void DecodeAll(ByteBuffer buf, IProtocolDecoderOutput output)
+      {
+         for ( ; ; )
+         {
+            int oldPos = buf.Position;
+            bool decoded = DoDecode(buf, output);
+            if ( decoded )
+            {
+               if ( buf.Position == oldPos )
+               {
+                  throw new Exception(
+                      "doDecode() can't return true when buffer is not consumed.");
+               }
+
+               if ( !buf.HasRemaining )
+               {
+                  break;
+               }
+            } else
+            {
+               break;
+            }
+         }
+      }
+
+      /// <summary>
+      /// Implement this method to consume the specified cumulative buffer and
+      /// decode its content into message(s). 
+      /// </summary>
+      /// <param name="input">the cumulative buffer</param>
+      /// <param name="output">decoder output</param>
+      /// <returns>
+      /// <tt>true</tt> if and only if there's more to decode in the buffer
+      /// and you want to have <tt>doDecode</tt> method invoked again.
+      /// Return <tt>false</tt> if remaining data is not enough to decode,
+      /// then this method will be invoked again when more data is cumulated.
+      /// </returns>
+      /// <exception cref="Exception">If cannot decode</exception>
+      protected abstract bool DoDecode(ByteBuffer input, IProtocolDecoderOutput output);
+
+      public void Dispose()
+      {
+         _remaining = null;
+      }
+
+      private ByteBuffer AllocateBuffer()
+      {
+         ByteBuffer buffer = ByteBuffer.Allocate(4096);
+         buffer.IsAutoExpand = true;
+         return buffer;
+      }
+   }
 }

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Framing/BasicContentHeaderProperties.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Framing/BasicContentHeaderProperties.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Framing/BasicContentHeaderProperties.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Framing/BasicContentHeaderProperties.cs Thu May 17 17:51:12 2007
@@ -25,144 +25,266 @@
 
 namespace Qpid.Framing
 {
-    public class BasicContentHeaderProperties : IContentHeaderProperties
-    {
-        private static readonly ILog _log = LogManager.GetLogger(typeof(BasicContentHeaderProperties));
+   public class BasicContentHeaderProperties : IContentHeaderProperties
+   {
+      private static readonly ILog _log = LogManager.GetLogger(typeof(BasicContentHeaderProperties));
+
+      private string _contentType;
+      private string _encoding;
+      private FieldTable _headers;
+      private byte _deliveryMode;
+      private byte _priority;
+      private string _correlationId;
+      private long _expiration;
+      private string _replyTo;
+      private string _messageId;
+      private ulong _timestamp;
+      private string _type;
+      private string _userId;
+      private string _appId;
+      private string _clusterId;
+
+
+      #region Properties
+      //
+      // Properties
+      //
+
+      /// <summary>
+      /// The MIME Content Type
+      /// </summary>
+      public string ContentType
+      {
+         get { return _contentType; }
+         set { _contentType = value; }
+      }
+
+      /// <summary>
+      /// The MIME Content Encoding
+      /// </summary>
+      public string Encoding
+      {
+         get { return _encoding; }
+         set { _encoding = value; }
+      }
+
+      /// <summary>
+      /// Message headers
+      /// </summary>
+      public FieldTable Headers
+      {
+         get { return _headers; }
+         set { _headers = value; }
+      }
+
+      /// <summary>
+      /// Non-persistent (1) or persistent (2)
+      /// </summary>
+      public byte DeliveryMode
+      {
+         get { return _deliveryMode; }
+         set { _deliveryMode = value; }
+      }
+
+      /// <summary>
+      /// The message priority, 0 to 9
+      /// </summary>
+      public byte Priority
+      {
+         get { return _priority; }
+         set { _priority = value; }
+      }
+
+      /// <summary>
+      /// The application correlation identifier
+      /// </summary>
+      public string CorrelationId
+      {
+         get { return _correlationId; }
+         set { _correlationId = value; }
+      }
+
+      /// <summary>
+      /// Message expiration specification
+      /// </summary>
+      // TODO: Should be string according to spec
+      public long Expiration
+      {
+         get { return _expiration; }
+         set { _expiration = value; }
+      }
+
+      /// <summary>
+      /// The destination to reply to
+      /// </summary>
+      public string ReplyTo
+      {
+         get { return _replyTo; }
+         set { _replyTo = value; }
+      }
+
+      /// <summary>
+      /// The application message identifier
+      /// </summary>
+      public string MessageId
+      {
+         get { return _messageId; }
+         set { _messageId = value; }
+      }
+
+      /// <summary>
+      /// The message timestamp
+      /// </summary>
+      public ulong Timestamp
+      {
+         get { return _timestamp; }
+         set { _timestamp = value; }
+      }
+
+      /// <summary>
+      /// The message type name
+      /// </summary>
+      public string Type
+      {
+         get { return _type; }
+         set { _type = value; }
+      }
+
+      /// <summary>
+      /// The creating user id
+      /// </summary>
+      public string UserId
+      {
+         get { return _userId; }
+         set { _userId = value; }
+      }
+
+      /// <summary>
+      /// The creating application id
+      /// </summary>
+      public string AppId
+      {
+         get { return _appId; }
+         set { _appId = value; }
+      }
+
+      /// <summary>
+      /// Intra-cluster routing identifier
+      /// </summary>
+      public string ClusterId
+      {
+         get { return _clusterId; }
+         set { _clusterId = value; }
+      }
+
+      #endregion // Properties
+
+
+      public BasicContentHeaderProperties()
+      {
+      }
+
+      public uint PropertyListSize
+      {
+         get
+         {
+            return (uint)(EncodingUtils.EncodedShortStringLength(ContentType) +
+                   EncodingUtils.EncodedShortStringLength(Encoding) +
+                   EncodingUtils.EncodedFieldTableLength(Headers) +
+                   1 + 1 +
+                   EncodingUtils.EncodedShortStringLength(CorrelationId) +
+                   EncodingUtils.EncodedShortStringLength(ReplyTo) +
+                   EncodingUtils.EncodedShortStringLength(String.Format("{0:D}", Expiration)) +
+                   EncodingUtils.EncodedShortStringLength(MessageId) +
+                   8 +
+                   EncodingUtils.EncodedShortStringLength(Type) +
+                   EncodingUtils.EncodedShortStringLength(UserId) +
+                   EncodingUtils.EncodedShortStringLength(AppId) +
+                   EncodingUtils.EncodedShortStringLength(ClusterId));
+
+         }
+      }
+
+      public ushort PropertyFlags
+      {
+         get
+         {
+            int value = 0;
 
-        public string ContentType;
-
-        public string Encoding;
-
-        public FieldTable Headers;
-
-        public byte DeliveryMode;
-
-        public byte Priority;
-
-        public string CorrelationId;
-
-        public long Expiration;
-
-        public string ReplyTo;
-
-        public string MessageId;
-
-        public ulong Timestamp;
-
-        public string Type;
-
-        public string UserId;
-
-        public string AppId;
-
-        public string ClusterId;
-        
-        public BasicContentHeaderProperties()
-        {
-        }
-
-        public uint PropertyListSize
-        {
-            get
-            {
-                return (uint)(EncodingUtils.EncodedShortStringLength(ContentType) +
-                       EncodingUtils.EncodedShortStringLength(Encoding) +
-                       EncodingUtils.EncodedFieldTableLength(Headers) +
-                       1 + 1 +
-                       EncodingUtils.EncodedShortStringLength(CorrelationId) +
-                       EncodingUtils.EncodedShortStringLength(ReplyTo) +
-                       EncodingUtils.EncodedShortStringLength(String.Format("{0:D}", Expiration)) +
-                       EncodingUtils.EncodedShortStringLength(MessageId) +
-                       8 +
-                       EncodingUtils.EncodedShortStringLength(Type) +
-                       EncodingUtils.EncodedShortStringLength(UserId) +
-                       EncodingUtils.EncodedShortStringLength(AppId) +
-                       EncodingUtils.EncodedShortStringLength(ClusterId));
-                
-            }
-        }
-
-        public ushort PropertyFlags
-        {   
-            get
-            {
-                int value = 0;
-
-                // for now we just blast in all properties
-                for (int i = 0; i < 14; i++)
-                {
-                    value += (1 << (15-i));
-                }
-                return (ushort) value;
-            }
-        }
-
-        public void WritePropertyListPayload(ByteBuffer buffer)
-        {
-            EncodingUtils.WriteShortStringBytes(buffer, ContentType);
-            EncodingUtils.WriteShortStringBytes(buffer, Encoding);
-            EncodingUtils.WriteFieldTableBytes(buffer, Headers);
-            buffer.Put(DeliveryMode);
-            buffer.Put(Priority);
-            EncodingUtils.WriteShortStringBytes(buffer, CorrelationId);
-            EncodingUtils.WriteShortStringBytes(buffer, ReplyTo);
-            EncodingUtils.WriteShortStringBytes(buffer, String.Format("{0:D}", Expiration));
-            EncodingUtils.WriteShortStringBytes(buffer, MessageId);            
-            buffer.Put(Timestamp);            
-            EncodingUtils.WriteShortStringBytes(buffer, Type);
-            EncodingUtils.WriteShortStringBytes(buffer, UserId);
-            EncodingUtils.WriteShortStringBytes(buffer, AppId);
-            EncodingUtils.WriteShortStringBytes(buffer, ClusterId);
-        }
-
-        public void PopulatePropertiesFromBuffer(ByteBuffer buffer, ushort propertyFlags) 
-        {
-            _log.Debug("Property flags: " + propertyFlags);
-            if ((propertyFlags & (1 << 15)) > 0)
-                ContentType = EncodingUtils.ReadShortString(buffer);
-            if ((propertyFlags & (1 << 14)) > 0)
-                Encoding = EncodingUtils.ReadShortString(buffer);
-            if ((propertyFlags & (1 << 13)) > 0)
-                Headers = EncodingUtils.ReadFieldTable(buffer);
-            if ((propertyFlags & (1 << 12)) > 0)
-                DeliveryMode = buffer.GetByte();
-            if ((propertyFlags & (1 << 11)) > 0)
-                Priority = buffer.GetByte();
-            if ((propertyFlags & (1 << 10)) > 0)
-                CorrelationId = EncodingUtils.ReadShortString(buffer);
-            if ((propertyFlags & (1 << 9)) > 0)
-                ReplyTo = EncodingUtils.ReadShortString(buffer);
-            if ((propertyFlags & (1 << 8)) > 0)
-                Expiration = EncodingUtils.ReadLongAsShortString(buffer);
-            if ((propertyFlags & (1 << 7)) > 0)
-                MessageId = EncodingUtils.ReadShortString(buffer);
-            if ((propertyFlags & (1 << 6)) > 0)
-                Timestamp = buffer.GetUInt64();            
-            if ((propertyFlags & (1 << 5)) > 0)
-                Type = EncodingUtils.ReadShortString(buffer);
-            if ((propertyFlags & (1 << 4)) > 0)
-                UserId = EncodingUtils.ReadShortString(buffer);
-            if ((propertyFlags & (1 << 3)) > 0)
-                AppId = EncodingUtils.ReadShortString(buffer);
-            if ((propertyFlags & (1 << 2)) > 0)
-                ClusterId = EncodingUtils.ReadShortString(buffer);
-        }
-
-        public void SetDeliveryMode(DeliveryMode deliveryMode)
-        {
-            if (deliveryMode == Messaging.DeliveryMode.NonPersistent)
+            // for now we just blast in all properties
+            for ( int i = 0; i < 14; i++ )
             {
-                DeliveryMode = 1;
+               value += (1 << (15 - i));
             }
-            else
-            {
-                DeliveryMode = 2;
-            }
-        }
-
-        public override string ToString()
-        {
-            return "Properties: " + ContentType + " " + Encoding + " " + Timestamp + " " + Type;
-        }
-    }
+            return (ushort)value;
+         }
+      }
+
+      public void WritePropertyListPayload(ByteBuffer buffer)
+      {
+         EncodingUtils.WriteShortStringBytes(buffer, ContentType);
+         EncodingUtils.WriteShortStringBytes(buffer, Encoding);
+         EncodingUtils.WriteFieldTableBytes(buffer, Headers);
+         buffer.Put(DeliveryMode);
+         buffer.Put(Priority);
+         EncodingUtils.WriteShortStringBytes(buffer, CorrelationId);
+         EncodingUtils.WriteShortStringBytes(buffer, ReplyTo);
+         EncodingUtils.WriteShortStringBytes(buffer, String.Format("{0:D}", Expiration));
+         EncodingUtils.WriteShortStringBytes(buffer, MessageId);
+         buffer.Put(Timestamp);
+         EncodingUtils.WriteShortStringBytes(buffer, Type);
+         EncodingUtils.WriteShortStringBytes(buffer, UserId);
+         EncodingUtils.WriteShortStringBytes(buffer, AppId);
+         EncodingUtils.WriteShortStringBytes(buffer, ClusterId);
+      }
+
+      public void PopulatePropertiesFromBuffer(ByteBuffer buffer, ushort propertyFlags)
+      {
+         _log.Debug("Property flags: " + propertyFlags);
+         if ( (propertyFlags & (1 << 15)) > 0 )
+            ContentType = EncodingUtils.ReadShortString(buffer);
+         if ( (propertyFlags & (1 << 14)) > 0 )
+            Encoding = EncodingUtils.ReadShortString(buffer);
+         if ( (propertyFlags & (1 << 13)) > 0 )
+            Headers = EncodingUtils.ReadFieldTable(buffer);
+         if ( (propertyFlags & (1 << 12)) > 0 )
+            DeliveryMode = buffer.GetByte();
+         if ( (propertyFlags & (1 << 11)) > 0 )
+            Priority = buffer.GetByte();
+         if ( (propertyFlags & (1 << 10)) > 0 )
+            CorrelationId = EncodingUtils.ReadShortString(buffer);
+         if ( (propertyFlags & (1 << 9)) > 0 )
+            ReplyTo = EncodingUtils.ReadShortString(buffer);
+         if ( (propertyFlags & (1 << 8)) > 0 )
+            Expiration = EncodingUtils.ReadLongAsShortString(buffer);
+         if ( (propertyFlags & (1 << 7)) > 0 )
+            MessageId = EncodingUtils.ReadShortString(buffer);
+         if ( (propertyFlags & (1 << 6)) > 0 )
+            Timestamp = buffer.GetUInt64();
+         if ( (propertyFlags & (1 << 5)) > 0 )
+            Type = EncodingUtils.ReadShortString(buffer);
+         if ( (propertyFlags & (1 << 4)) > 0 )
+            UserId = EncodingUtils.ReadShortString(buffer);
+         if ( (propertyFlags & (1 << 3)) > 0 )
+            AppId = EncodingUtils.ReadShortString(buffer);
+         if ( (propertyFlags & (1 << 2)) > 0 )
+            ClusterId = EncodingUtils.ReadShortString(buffer);
+      }
+
+      public void SetDeliveryMode(DeliveryMode deliveryMode)
+      {
+         if ( deliveryMode == Messaging.DeliveryMode.NonPersistent )
+         {
+            DeliveryMode = 1;
+         } else
+         {
+            DeliveryMode = 2;
+         }
+      }
+
+      public override string ToString()
+      {
+         return "Properties: " + ContentType + " " + Encoding + " " + Timestamp + " " + Type;
+      }
+   }
 }

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Framing/ContentBody.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Framing/ContentBody.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Framing/ContentBody.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Framing/ContentBody.cs Thu May 17 17:51:12 2007
@@ -26,11 +26,24 @@
     {
         public const byte TYPE = 3;
 
-        /// <summary>
-        /// 
-        /// </summary>
-        /// TODO: consider whether this should be a pointer into the ByteBuffer to avoid copying */
-        public byte[] Payload;
+        private ByteBuffer _payload;
+       
+        public ByteBuffer Payload
+        {
+           get { return _payload; }
+        }
+
+       public ContentBody()
+       {
+       }
+       public ContentBody(ByteBuffer payload)
+       {
+          PopulateFromBuffer(payload, (uint)payload.Remaining);
+       }
+       public ContentBody(ByteBuffer payload, uint length)
+       {
+          PopulateFromBuffer(payload, length);
+       }
 
         #region IBody Members
 
@@ -46,7 +59,7 @@
         {
             get
             {
-                return (ushort)(Payload == null ? 0 : Payload.Length);
+                return (ushort)(Payload == null ? 0 : Payload.Remaining);
             }
         }
 
@@ -55,6 +68,7 @@
             if (Payload != null)
             {
                 buffer.Put(Payload);
+                Payload.Rewind();
             }
         }
 
@@ -62,8 +76,9 @@
         {
             if (size > 0)
             {
-                Payload = new byte[size];
-                buffer.GetBytes(Payload);
+                _payload = buffer.Slice();
+                _payload.Limit = (int)size;
+                buffer.Skip((int)size);
             }
         }
 
@@ -75,6 +90,11 @@
             frame.Channel = channelId;
             frame.BodyFrame = body;
             return frame;
+        }
+
+        public override string ToString()
+        {
+            return string.Format("ContentBody [ Size: {0} ]", Size);
         }
     }
 }

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Framing/FieldTable.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Framing/FieldTable.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Framing/FieldTable.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Framing/FieldTable.cs Thu May 17 17:51:12 2007
@@ -47,7 +47,6 @@
         /// </summary>
         /// <param name="buffer">the buffer from which to read data. The length byte must be read already</param>
         /// <param name="length">the length of the field table. Must be > 0.</param>
-        /// <exception cref="AMQFrameDecodingException">if there is an error decoding the table</exception>
         public FieldTable(ByteBuffer buffer, uint length) : this()
         {
            _encodedForm = buffer.Slice();
@@ -497,27 +496,18 @@
         
         private AMQTypedValue GetProperty(string name)
         {
-           lock ( _syncLock )
-           {
-              if ( _properties == null )
-              {
-                 if ( _encodedForm == null )
-                 {
-                    return null;
-                 } else
-                 {
-                    PopulateFromBuffer();
-                 }
-              }
-              return (AMQTypedValue) _properties[name];
-           }
+           InitMapIfNecessary();
+           return (AMQTypedValue) _properties[name];
         }
 
         private void PopulateFromBuffer()
         {
            try
            {
-              SetFromBuffer(_encodedForm, _encodedSize);
+              ByteBuffer buffer = _encodedForm;
+              _encodedForm = null;
+              if ( buffer != null )
+                 SetFromBuffer(buffer, _encodedSize);
            } catch ( AMQFrameDecodingException e )
            {
               _log.Error("Error decoding FieldTable in deferred decoding mode ", e);
@@ -598,7 +588,11 @@
         {
            if ( _encodedForm != null )
            {
-              buffer.Put(_encodedForm);
+              lock ( _syncLock )
+              {
+                 buffer.Put(_encodedForm);
+                 _encodedForm.Flip();
+              }
            } else if ( _properties != null )
            {
               foreach ( DictionaryEntry de in _properties )
@@ -629,6 +623,7 @@
                        _log.Debug("Buffer Position:" + buffer.Position +
                                   " Remaining:" + buffer.Remaining);
                     }
+                    throw;
                  }
               }
            }

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IChannel.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IChannel.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IChannel.cs Thu May 17 17:51:12 2007
@@ -23,42 +23,135 @@
 namespace Qpid.Messaging
 {
     public delegate void MessageReceivedDelegate(IMessage msg);
-
+    
+    /// <summary>
+    /// Interface used to manipulate an AMQP channel. 
+    /// </summary>
+    /// <remarks>
+    /// You can create a channel by using the CreateChannel() method
+    /// of the connection object.
+    /// </remarks>
     public interface IChannel : IDisposable
     {
+        /// <summary>
+        /// Acknowledge mode for messages received
+        /// </summary>
         AcknowledgeMode AcknowledgeMode { get; }
+        /// <summary>
+        /// True if the channel should use transactions
+        /// </summary>
         bool Transacted { get; }
 
         /// <summary>
         /// Prefetch value to be used as the default for consumers created on this channel.
         /// </summary>
-        int DefaultPrefetch
-        {
-            get;
-            set;
-        }
-
+        int DefaultPrefetch { get; set; }
+        
+        /// <summary>
+        /// Declare a new exchange
+        /// </summary>
+        /// <param name="exchangeName">Name of the exchange</param>
+        /// <param name="exchangeClass">Class of the exchange, from <see cref="ExchangeClassConstants"/></param>
         void DeclareExchange(string exchangeName, string exchangeClass);
+        /// <summary>
+        /// Declare a new exchange using the default exchange class
+        /// </summary>
+        /// <param name="exchangeName">Name of the exchange</param>
         void DeleteExchange(string exchangeName);
 
+        /// <summary>
+        /// Declare a new queue with the specified set of arguments
+        /// </summary>
+        /// <param name="queueName">Name of the queue</param>
+        /// <param name="isDurable">True if the queue should be durable</param>
+        /// <param name="isExclusive">True if the queue should be exclusive to this channel</param>
+        /// <param name="isAutoDelete">True if the queue should be deleted when the channel closes</param>
         void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete);
-        void DeleteQueue();
-
+        /// <summary>
+        /// Delete a queue with the specifies arguments
+        /// </summary>
+        /// <param name="queueName">Name of the queue to delete</param>
+        /// <param name="ifUnused">If true, the queue will not deleted if it has no consumers</param>
+        /// <param name="ifEmpty">If true, the queue will not deleted if it has no messages</param>
+        /// <param name="noWait">If true, the server will not respond to the method</param>
+        void DeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait);
+        /// <summary>
+        /// Generate a new Unique name to use for a queue
+        /// </summary>
+        /// <returns>A unique name to this channel</returns>
         string GenerateUniqueName();
-        IFieldTable CreateFieldTable();
 
+        /// <summary>
+        /// Removes all messages from a queue
+        /// </summary>
+        /// <param name="queueName">Name of the queue to delete</param>
+        /// <param name="noWait">If true, the server will not respond to the method</param>
+        void PurgeQueue(string queueName, bool noWait);
+        
+        /// <summary>
+        /// Bind a queue to the specified exchange
+        /// </summary>
+        /// <param name="queueName">Name of queue to bind</param>
+        /// <param name="exchangeName">Name of exchange to bind to</param>
+        /// <param name="routingKey">Routing key</param>
         void Bind(string queueName, string exchangeName, string routingKey);
+        /// <summary>
+        /// Bind a queue to the specified exchange
+        /// </summary>
+        /// <param name="queueName">Name of queue to bind</param>
+        /// <param name="exchangeName">Name of exchange to bind to</param>
+        /// <param name="routingKey">Routing key</param>
+        /// <param name="args">Table of arguments for the binding. Used to bind with a Headers Exchange</param>
         void Bind(string queueName, string exchangeName, string routingKey, IFieldTable args);
 
+        /// <summary>
+        /// Create a new empty message with no body
+        /// </summary>
+        /// <returns>The new message</returns>
         IMessage CreateMessage();
+        /// <summary>
+        /// Create a new message of the specified MIME type
+        /// </summary>
+        /// <param name="mimeType">The mime type to create</param>
+        /// <returns>The new message</returns>
+        IMessage CreateMessage(string mimeType);
+        /// <summary>
+        /// Creates a new message for bytes (application/octet-stream)
+        /// </summary>
+        /// <returns>The new message</returns>
         IBytesMessage CreateBytesMessage();
+        /// <summary>
+        /// Creates a new text message (text/plain) with empty content
+        /// </summary>
+        /// <returns>The new message</returns>
         ITextMessage CreateTextMessage();
+        /// <summary>
+        /// Creates a new text message (text/plain) with a body
+        /// </summary>
+        /// <param name="initialValue">Initial body of the message</param>
+        /// <returns>The new message</returns>
         ITextMessage CreateTextMessage(string initialValue);
 
         #region Consuming
-
+        
+        /// <summary>
+        /// Creates a new Consumer using the builder pattern
+        /// </summary>
+        /// <param name="queueName">Name of queue to receive messages from</param>
+        /// <returns>The builder object</returns>
         MessageConsumerBuilder CreateConsumerBuilder(string queueName);
 
+        /// <summary>
+        /// Creates a new consumer
+        /// </summary>
+        /// <param name="queueName">Name of queue to receive messages from</param>
+        /// <param name="prefetchLow">Low prefetch value</param>
+        /// <param name="prefetchHigh">High prefetch value</param>
+        /// <param name="noLocal">If true, messages sent on this channel will not be received by this consumer</param>
+        /// <param name="exclusive">If true, the consumer opens the queue in exclusive mode</param>
+        /// <param name="durable">If true, create a durable subscription</param>
+        /// <param name="subscriptionName">Subscription name</param>
+        /// <returns>The new consumer</returns>
         IMessageConsumer CreateConsumer(string queueName,
                                         int prefetchLow, 
                                         int prefetchHigh,
@@ -66,15 +159,35 @@
                                         bool exclusive,
                                         bool durable,
                                         string subscriptionName);
-
+        
+        /// <summary>
+        /// Unsubscribe from a queue
+        /// </summary>
+        /// <param name="subscriptionName">Subscription name</param>
         void Unsubscribe(string subscriptionName);
 
         #endregion
 
         #region Publishing
 
+        /// <summary>
+        /// Create a new message publisher using the builder pattern
+        /// </summary>
+        /// <returns>The builder object</returns>
         MessagePublisherBuilder CreatePublisherBuilder();
-
+        
+        /// <summary>
+        /// Create a new message publisher
+        /// </summary>
+        /// <param name="exchangeName">Name of exchange to publish to</param>
+        /// <param name="routingKey">Routing key</param>
+        /// <param name="deliveryMode">Default delivery mode</param>
+        /// <param name="timeToLive">Default TTL time of messages</param>
+        /// <param name="immediate">If true, sent immediately</param>
+        /// <param name="mandatory">If true, the broker will return an error 
+        /// (as a connection exception) if the message cannot be delivered</param>
+        /// <param name="priority">Default message priority</param>
+        /// <returns>The new message publisher</returns>
         IMessagePublisher CreatePublisher(string exchangeName,
                                         string routingKey,
                                         DeliveryMode deliveryMode,
@@ -86,9 +199,18 @@
         #endregion
 
         #region Transactions
-
+        
+        /// <summary>
+        /// Recover after transaction failure
+        /// </summary>
         void Recover();
+        /// <summary>
+        /// Commit the transaction
+        /// </summary>
         void Commit();
+        /// <summary>
+        /// Rollback the transaction
+        /// </summary>
         void Rollback();
 
         #endregion

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IHeaders.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IHeaders.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IHeaders.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IHeaders.cs Thu May 17 17:51:12 2007
@@ -35,7 +35,7 @@
     {
         bool Contains(string name);
 
-        string this[string name] { get; set; }
+        object this[string name] { get; set; }
         
         bool GetBoolean(string name);
         void SetBoolean(string name, bool value);

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IMessage.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IMessage.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IMessage.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IMessage.cs Thu May 17 17:51:12 2007
@@ -22,25 +22,75 @@
 {   
     public interface IMessage
     {
-        string ContentType { get; set;}
-        string ContentEncoding { get; set; }
-        string CorrelationId { get; set; }
-        byte[] CorrelationIdAsBytes { get; set; }
-        DeliveryMode DeliveryMode { get; set; }
-        long Expiration { get; set; }
-        string MessageId { get; set; }
-        int Priority { get; set; }
+       /// <summary>
+       /// The MIME Content Type
+       /// </summary>
+       string ContentType { get; set;}
+       /// <summary>
+       /// The MIME Content Encoding
+       /// </summary>
+       string ContentEncoding { get; set; }
+       /// <summary>
+       /// The application correlation identifier
+       /// </summary>
+       string CorrelationId { get; set; }
+       /// <summary>
+       /// The application correlation identifier, as an array of bytes
+       /// </summary>
+       byte[] CorrelationIdAsBytes { get; set; }
+       /// <summary>
+       /// Non-persistent (1) or persistent (2)
+       /// </summary>
+       DeliveryMode DeliveryMode { get; set; }
+       /// <summary>
+       /// Message expiration specification
+       /// </summary>
+       long Expiration { get; set; }
+       /// <summary>
+       /// The application message identifier
+       /// </summary>
+       string MessageId { get; set; }
+       /// <summary>
+       /// The message priority, 0 to 9
+       /// </summary>
+       byte Priority { get; set; }
+       /// <summary>
+       /// True if the message has been redelivered
+       /// </summary>
         bool Redelivered { get; set; }
+        /// <summary>
+        /// Exchange name of the reply-to address
+        /// </summary>
         string ReplyToExchangeName { get; set; }
+        /// <summary>
+        /// Routing key of the reply-to address
+        /// </summary>
         string ReplyToRoutingKey { get; set; }
+        /// <summary>
+        /// The message timestamp
+        /// </summary>
         long Timestamp { get; set; }
+        /// <summary>
+        /// The message type name
+        /// </summary>
         string Type { get; set; }
+        /// <summary>
+        /// Message headers
+        /// </summary>
         IHeaders Headers { get; }
-
-        // XXX: UserId?
-        // XXX: AppId?
-        // XXX: ClusterId?
-
+        /// <summary>
+        /// The creating user id
+        /// </summary>
+        string UserId { get; set; }
+        /// <summary>
+        /// The creating application id
+        /// </summary>
+        string AppId { get; set; }
+        /// <summary>
+        /// Intra-cluster routing identifier
+        /// </summary>
+        string ClusterId { get; set; }
+        
         void Acknowledge();
         void ClearBody();
     }