You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/06/16 16:02:16 UTC

svn commit: r668164 [1/2] - in /incubator/qpid/branches/M2.1.x/dotnet: Qpid.Client/Client/ Qpid.Client/Client/Message/ Qpid.Integration.Tests/ Qpid.Integration.Tests/testcases/ Qpid.Messaging/

Author: aidan
Date: Mon Jun 16 07:02:16 2008
New Revision: 668164

URL: http://svn.apache.org/viewvc?rev=668164&view=rev
Log:
QPID-1104: Add an IMessage.Acknowledge(bool) so that only specific messages can be acknowledged, not all messages recieved on the Channel up to that point.

Qpid.Client/Client/Message/AbstractQmsMessage.cs, dotnet/Qpid.Messaging/IMessage.cs: add paramatarised ack so that only certain messages can be ack'd

Qpid.Integration.Tests/Qpid.Integration.Tests.csproj: add new test case

Qpid.Integration.Tests/testcases/ClientAckTests.cs: new tests

Qpid.Client/Client/BasicMessageProducer.cs, Qpid.Messaging/IMessagePublisher.cs:Add Channel property that the producer belongs too

Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs: add SendMessages method

Added:
    incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/ClientAckTests.cs   (with props)
Modified:
    incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
    incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
    incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/Qpid.Integration.Tests.csproj
    incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs
    incubator/qpid/branches/M2.1.x/dotnet/Qpid.Messaging/IMessage.cs
    incubator/qpid/branches/M2.1.x/dotnet/Qpid.Messaging/IMessagePublisher.cs

Modified: incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/BasicMessageProducer.cs?rev=668164&r1=668163&r2=668164&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/BasicMessageProducer.cs (original)
+++ incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/BasicMessageProducer.cs Mon Jun 16 07:02:16 2008
@@ -1,405 +1,407 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Threading;
-using log4net;
-using Apache.Qpid.Buffer;
-using Apache.Qpid.Client.Message;
-using Apache.Qpid.Messaging;
-using Apache.Qpid.Framing;
-
-namespace Apache.Qpid.Client
-{
-   public class BasicMessageProducer : Closeable, IMessagePublisher
-   {
-      protected readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageProducer));
-
-      /// <summary>
-      /// If true, messages will not get a timestamp.
-      /// </summary>
-      private bool _disableTimestamps;
-
-      /// <summary>
-      /// Priority of messages created by this producer.
-      /// </summary>
-      private int _messagePriority;
-
-      /// <summary>
-      /// Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
-      /// </summary>
-      private long _timeToLive;
-
-      /// <summary>
-      /// Delivery mode used for this producer.
-      /// </summary>
-      private DeliveryMode _deliveryMode;
-
-      private bool _immediate;
-      private bool _mandatory;
-
-      string _exchangeName;
-      string _routingKey;
-
-      /// <summary>
-      /// Default encoding used for messages produced by this producer.
-      /// </summary>
-      private string _encoding;
-
-      /// <summary>
-      /// Default encoding used for message produced by this producer.
-      /// </summary>
-      private string _mimeType;
-
-      /// <summary>
-      /// True if this producer was created from a transacted session
-      /// </summary>
-      private bool _transacted;
-
-      private ushort _channelId;
-
-      /// <summary>
-      /// This is an id generated by the session and is used to tie individual producers to the session. This means we
-      /// can deregister a producer with the session when the producer is closed. We need to be able to tie producers
-      /// to the session so that when an error is propagated to the session it can close the producer (meaning that
-      /// a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently).
-      /// </summary>
-      private long _producerId;
-
-      /// <summary>
-      /// The session used to create this producer
-      /// </summary>
-      private AmqChannel _channel;
-
-      public BasicMessageProducer(string exchangeName, string routingKey,
-          bool transacted,
-          ushort channelId,
-          AmqChannel channel,
-          long producerId,
-          DeliveryMode deliveryMode,
-          long timeToLive,
-          bool immediate,
-          bool mandatory,
-          int priority)
-      {
-         _exchangeName = exchangeName;
-         _routingKey = routingKey;
-         _transacted = transacted;
-         _channelId = channelId;
-         _channel = channel;
-         _producerId = producerId;
-         _deliveryMode = deliveryMode;
-         _timeToLive = timeToLive;
-         _immediate = immediate;
-         _mandatory = mandatory;
-         _messagePriority = priority;
-
-         _channel.RegisterProducer(producerId, this);
-      }
-
-
-      #region IMessagePublisher Members
-
-      public DeliveryMode DeliveryMode
-      {
-         get
-         {
-            CheckNotClosed();
-            return _deliveryMode;
-         }
-         set
-         {
-            CheckNotClosed();
-            _deliveryMode = value;
-         }
-      }
-
-      public string ExchangeName
-      {
-         get { return _exchangeName; }
-      }
-
-      public string RoutingKey
-      {
-         get { return _routingKey; }
-      }
-
-      public bool DisableMessageID
-      {
-         get
-         {
-            throw new Exception("The method or operation is not implemented.");
-         }
-         set
-         {
-            throw new Exception("The method or operation is not implemented.");
-         }
-      }
-
-      public bool DisableMessageTimestamp
-      {
-         get
-         {
-            CheckNotClosed();
-            return _disableTimestamps;
-         }
-         set
-         {
-            CheckNotClosed();
-            _disableTimestamps = value;
-         }
-      }
-
-      public int Priority
-      {
-         get
-         {
-            CheckNotClosed();
-            return _messagePriority;
-         }
-         set
-         {
-            CheckNotClosed();
-            if ( value < 0 || value > 9 )
-            {
-               throw new ArgumentOutOfRangeException("Priority of " + value + " is illegal. Value must be in range 0 to 9");
-            }
-            _messagePriority = value;
-         }
-      }
-
-      public override void Close()
-      {
-         _logger.Debug("Closing producer " + this);
-         Interlocked.Exchange(ref _closed, CLOSED);
-         _channel.DeregisterProducer(_producerId);
-      }
-
-      public void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive)
-      {
-         CheckNotClosed();
-         SendImpl(
-            _exchangeName, 
-            _routingKey, 
-            (AbstractQmsMessage)msg, 
-            deliveryMode, 
-            priority, 
-            (uint)timeToLive, 
-            _mandatory,
-            _immediate
-            );
-      }
-
-      public void Send(IMessage msg)
-      {
-         CheckNotClosed();
-         SendImpl(
-            _exchangeName, 
-            _routingKey, 
-            (AbstractQmsMessage)msg, 
-            _deliveryMode, 
-            _messagePriority, 
-            (uint)_timeToLive,
-            _mandatory, 
-            _immediate
-            );
-      }
-
-      // This is a short-term hack (knowing that this code will be re-vamped sometime soon)
-      // to facilitate publishing messages to potentially non-existent recipients.
-      public void Send(IMessage msg, bool mandatory)
-      {
-         CheckNotClosed();
-         SendImpl(
-            _exchangeName, 
-            _routingKey, 
-            (AbstractQmsMessage)msg, 
-            _deliveryMode, 
-            _messagePriority, 
-            (uint)_timeToLive,
-            mandatory, 
-            _immediate
-            );
-      }
-
-      public long TimeToLive
-      {
-         get
-         {
-            CheckNotClosed();
-            return _timeToLive;
-         }
-         set
-         {
-            CheckNotClosed();
-            if ( value < 0 )
-            {
-               throw new ArgumentOutOfRangeException("Time to live must be non-negative - supplied value was " + value);
-            }
-            _timeToLive = value;
-         }
-      }
-
-      #endregion
-
-      public string MimeType
-      {
-         get
-         {
-            CheckNotClosed();
-            return _mimeType;
-         }
-         set
-         {
-            CheckNotClosed();
-            _mimeType = value;
-         }
-      }
-
-      public string Encoding
-      {
-         get
-         {
-            CheckNotClosed();
-            return _encoding;
-         }
-         set
-         {
-            CheckNotClosed();
-            _encoding = value;
-         }
-      }
-
-      public void Dispose()
-      {
-         Close();
-      }
-
-      #region Message Publishing
-
-      private void SendImpl(string exchangeName, string routingKey, AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool mandatory, bool immediate)
-      {
-         // todo: handle session access ticket
-         AMQFrame publishFrame = BasicPublishBody.CreateAMQFrame(
-            _channel.ChannelId, 0, exchangeName,
-            routingKey, mandatory, immediate
-            );
-
-         // fix message properties
-         if ( !_disableTimestamps )
-         {
-            message.Timestamp = DateTime.UtcNow.Ticks;
-            if (timeToLive != 0)
-            {
-                message.Expiration = message.Timestamp + timeToLive;
-            }
-         } else
-         {
-            message.Expiration = 0;
-         }
-         message.DeliveryMode = deliveryMode;
-         message.Priority = (byte)priority;
-
-         ByteBuffer payload = message.Data;
-         int payloadLength = payload.Limit;
-
-         ContentBody[] contentBodies = CreateContentBodies(payload);
-         AMQFrame[] frames = new AMQFrame[2 + contentBodies.Length];
-         for ( int i = 0; i < contentBodies.Length; i++ )
-         {
-            frames[2 + i] = ContentBody.CreateAMQFrame(_channelId, contentBodies[i]);
-         }
-         if ( contentBodies.Length > 0 && _logger.IsDebugEnabled )
-         {
-            _logger.Debug(string.Format("Sending content body frames to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
-         }
-
-         // weight argument of zero indicates no child content headers, just bodies
-         AMQFrame contentHeaderFrame = ContentHeaderBody.CreateAMQFrame(
-            _channelId, AmqChannel.BASIC_CONTENT_TYPE, 0, 
-            message.ContentHeaderProperties, (uint)payloadLength
-            );
-         if ( _logger.IsDebugEnabled )
-         {
-            _logger.Debug(string.Format("Sending content header frame to  {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
-         }
-
-         frames[0] = publishFrame;
-         frames[1] = contentHeaderFrame;
-         CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
-
-         lock ( _channel.Connection.FailoverMutex )
-         {
-            _channel.Connection.ProtocolWriter.Write(compositeFrame);
-         }
-      }
-
-
-      /// <summary>
-      /// Create content bodies. This will split a large message into numerous bodies depending on the negotiated
-      /// maximum frame size.
-      /// </summary>
-      /// <param name="payload"></param>
-      /// <returns>return the array of content bodies</returns>
-      private ContentBody[] CreateContentBodies(ByteBuffer payload)
-      {
-         if ( payload == null )
-         {
-            return null;
-         } else if ( payload.Remaining == 0 )
-         {
-            return new ContentBody[0];
-         }
-         // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
-         // (0xCE byte).
-         int framePayloadMax = (int)(_channel.Connection.MaximumFrameSize - 1);
-         int frameCount = CalculateContentBodyFrames(payload);
-         ContentBody[] bodies = new ContentBody[frameCount];
-         for ( int i = 0; i < frameCount; i++ )
-         {
-            int length = (payload.Remaining >= framePayloadMax)
-               ? framePayloadMax : payload.Remaining;
-            bodies[i] = new ContentBody(payload, (uint)length);
-         }
-         return bodies;
-      }
-
-      private int CalculateContentBodyFrames(ByteBuffer payload)
-      {
-         // we substract one from the total frame maximum size to account 
-         // for the end of frame marker in a body frame
-         // (0xCE byte).
-         int frameCount;
-         if ( (payload == null) || (payload.Remaining == 0) )
-         {
-            frameCount = 0;
-         } else
-         {
-            int dataLength = payload.Remaining;
-            int framePayloadMax = (int)_channel.Connection.MaximumFrameSize - 1;
-            int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
-            frameCount = (int)(dataLength / framePayloadMax) + lastFrame;
-         }
-
-         return frameCount;
-      }
-      #endregion // Message Publishing
-   }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using log4net;
+using Apache.Qpid.Buffer;
+using Apache.Qpid.Client.Message;
+using Apache.Qpid.Messaging;
+using Apache.Qpid.Framing;
+
+namespace Apache.Qpid.Client
+{
+   public class BasicMessageProducer : Closeable, IMessagePublisher
+   {
+      protected readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageProducer));
+
+      /// <summary>
+      /// If true, messages will not get a timestamp.
+      /// </summary>
+      private bool _disableTimestamps;
+
+      /// <summary>
+      /// Priority of messages created by this producer.
+      /// </summary>
+      private int _messagePriority;
+
+      /// <summary>
+      /// Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
+      /// </summary>
+      private long _timeToLive;
+
+      /// <summary>
+      /// Delivery mode used for this producer.
+      /// </summary>
+      private DeliveryMode _deliveryMode;
+
+      private bool _immediate;
+      private bool _mandatory;
+
+      string _exchangeName;
+      string _routingKey;
+
+      /// <summary>
+      /// Default encoding used for messages produced by this producer.
+      /// </summary>
+      private string _encoding;
+
+      /// <summary>
+      /// Default encoding used for message produced by this producer.
+      /// </summary>
+      private string _mimeType;
+
+      /// <summary>
+      /// True if this producer was created from a transacted session
+      /// </summary>
+      private bool _transacted;
+
+      private ushort _channelId;
+
+      /// <summary>
+      /// This is an id generated by the session and is used to tie individual producers to the session. This means we
+      /// can deregister a producer with the session when the producer is closed. We need to be able to tie producers
+      /// to the session so that when an error is propagated to the session it can close the producer (meaning that
+      /// a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently).
+      /// </summary>
+      private long _producerId;
+
+      /// <summary>
+      /// The session used to create this producer
+      /// </summary>
+      private AmqChannel _channel;
+
+      public IChannel Channel { get {return _channel;}}
+      
+      public BasicMessageProducer(string exchangeName, string routingKey,
+          bool transacted,
+          ushort channelId,
+          AmqChannel channel,
+          long producerId,
+          DeliveryMode deliveryMode,
+          long timeToLive,
+          bool immediate,
+          bool mandatory,
+          int priority)
+      {
+         _exchangeName = exchangeName;
+         _routingKey = routingKey;
+         _transacted = transacted;
+         _channelId = channelId;
+         _channel = channel;
+         _producerId = producerId;
+         _deliveryMode = deliveryMode;
+         _timeToLive = timeToLive;
+         _immediate = immediate;
+         _mandatory = mandatory;
+         _messagePriority = priority;
+
+         _channel.RegisterProducer(producerId, this);
+      }
+
+
+      #region IMessagePublisher Members
+
+      public DeliveryMode DeliveryMode
+      {
+         get
+         {
+            CheckNotClosed();
+            return _deliveryMode;
+         }
+         set
+         {
+            CheckNotClosed();
+            _deliveryMode = value;
+         }
+      }
+
+      public string ExchangeName
+      {
+         get { return _exchangeName; }
+      }
+
+      public string RoutingKey
+      {
+         get { return _routingKey; }
+      }
+
+      public bool DisableMessageID
+      {
+         get
+         {
+            throw new Exception("The method or operation is not implemented.");
+         }
+         set
+         {
+            throw new Exception("The method or operation is not implemented.");
+         }
+      }
+
+      public bool DisableMessageTimestamp
+      {
+         get
+         {
+            CheckNotClosed();
+            return _disableTimestamps;
+         }
+         set
+         {
+            CheckNotClosed();
+            _disableTimestamps = value;
+         }
+      }
+
+      public int Priority
+      {
+         get
+         {
+            CheckNotClosed();
+            return _messagePriority;
+         }
+         set
+         {
+            CheckNotClosed();
+            if ( value < 0 || value > 9 )
+            {
+               throw new ArgumentOutOfRangeException("Priority of " + value + " is illegal. Value must be in range 0 to 9");
+            }
+            _messagePriority = value;
+         }
+      }
+
+      public override void Close()
+      {
+         _logger.Debug("Closing producer " + this);
+         Interlocked.Exchange(ref _closed, CLOSED);
+         _channel.DeregisterProducer(_producerId);
+      }
+
+      public void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive)
+      {
+         CheckNotClosed();
+         SendImpl(
+            _exchangeName, 
+            _routingKey, 
+            (AbstractQmsMessage)msg, 
+            deliveryMode, 
+            priority, 
+            (uint)timeToLive, 
+            _mandatory,
+            _immediate
+            );
+      }
+
+      public void Send(IMessage msg)
+      {
+         CheckNotClosed();
+         SendImpl(
+            _exchangeName, 
+            _routingKey, 
+            (AbstractQmsMessage)msg, 
+            _deliveryMode, 
+            _messagePriority, 
+            (uint)_timeToLive,
+            _mandatory, 
+            _immediate
+            );
+      }
+
+      // This is a short-term hack (knowing that this code will be re-vamped sometime soon)
+      // to facilitate publishing messages to potentially non-existent recipients.
+      public void Send(IMessage msg, bool mandatory)
+      {
+         CheckNotClosed();
+         SendImpl(
+            _exchangeName, 
+            _routingKey, 
+            (AbstractQmsMessage)msg, 
+            _deliveryMode, 
+            _messagePriority, 
+            (uint)_timeToLive,
+            mandatory, 
+            _immediate
+            );
+      }
+
+      public long TimeToLive
+      {
+         get
+         {
+            CheckNotClosed();
+            return _timeToLive;
+         }
+         set
+         {
+            CheckNotClosed();
+            if ( value < 0 )
+            {
+               throw new ArgumentOutOfRangeException("Time to live must be non-negative - supplied value was " + value);
+            }
+            _timeToLive = value;
+         }
+      }
+
+      #endregion
+
+      public string MimeType
+      {
+         get
+         {
+            CheckNotClosed();
+            return _mimeType;
+         }
+         set
+         {
+            CheckNotClosed();
+            _mimeType = value;
+         }
+      }
+
+      public string Encoding
+      {
+         get
+         {
+            CheckNotClosed();
+            return _encoding;
+         }
+         set
+         {
+            CheckNotClosed();
+            _encoding = value;
+         }
+      }
+
+      public void Dispose()
+      {
+         Close();
+      }
+
+      #region Message Publishing
+
+      private void SendImpl(string exchangeName, string routingKey, AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool mandatory, bool immediate)
+      {
+         // todo: handle session access ticket
+         AMQFrame publishFrame = BasicPublishBody.CreateAMQFrame(
+            _channel.ChannelId, 0, exchangeName,
+            routingKey, mandatory, immediate
+            );
+
+         // fix message properties
+         if ( !_disableTimestamps )
+         {
+            message.Timestamp = DateTime.UtcNow.Ticks;
+            if (timeToLive != 0)
+            {
+                message.Expiration = message.Timestamp + timeToLive;
+            }
+         } else
+         {
+            message.Expiration = 0;
+         }
+         message.DeliveryMode = deliveryMode;
+         message.Priority = (byte)priority;
+
+         ByteBuffer payload = message.Data;
+         int payloadLength = payload.Limit;
+
+         ContentBody[] contentBodies = CreateContentBodies(payload);
+         AMQFrame[] frames = new AMQFrame[2 + contentBodies.Length];
+         for ( int i = 0; i < contentBodies.Length; i++ )
+         {
+            frames[2 + i] = ContentBody.CreateAMQFrame(_channelId, contentBodies[i]);
+         }
+         if ( contentBodies.Length > 0 && _logger.IsDebugEnabled )
+         {
+            _logger.Debug(string.Format("Sending content body frames to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
+         }
+
+         // weight argument of zero indicates no child content headers, just bodies
+         AMQFrame contentHeaderFrame = ContentHeaderBody.CreateAMQFrame(
+            _channelId, AmqChannel.BASIC_CONTENT_TYPE, 0, 
+            message.ContentHeaderProperties, (uint)payloadLength
+            );
+         if ( _logger.IsDebugEnabled )
+         {
+            _logger.Debug(string.Format("Sending content header frame to  {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
+         }
+
+         frames[0] = publishFrame;
+         frames[1] = contentHeaderFrame;
+         CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
+
+         lock ( _channel.Connection.FailoverMutex )
+         {
+            _channel.Connection.ProtocolWriter.Write(compositeFrame);
+         }
+      }
+
+
+      /// <summary>
+      /// Create content bodies. This will split a large message into numerous bodies depending on the negotiated
+      /// maximum frame size.
+      /// </summary>
+      /// <param name="payload"></param>
+      /// <returns>return the array of content bodies</returns>
+      private ContentBody[] CreateContentBodies(ByteBuffer payload)
+      {
+         if ( payload == null )
+         {
+            return null;
+         } else if ( payload.Remaining == 0 )
+         {
+            return new ContentBody[0];
+         }
+         // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
+         // (0xCE byte).
+         int framePayloadMax = (int)(_channel.Connection.MaximumFrameSize - 1);
+         int frameCount = CalculateContentBodyFrames(payload);
+         ContentBody[] bodies = new ContentBody[frameCount];
+         for ( int i = 0; i < frameCount; i++ )
+         {
+            int length = (payload.Remaining >= framePayloadMax)
+               ? framePayloadMax : payload.Remaining;
+            bodies[i] = new ContentBody(payload, (uint)length);
+         }
+         return bodies;
+      }
+
+      private int CalculateContentBodyFrames(ByteBuffer payload)
+      {
+         // we substract one from the total frame maximum size to account 
+         // for the end of frame marker in a body frame
+         // (0xCE byte).
+         int frameCount;
+         if ( (payload == null) || (payload.Remaining == 0) )
+         {
+            frameCount = 0;
+         } else
+         {
+            int dataLength = payload.Remaining;
+            int framePayloadMax = (int)_channel.Connection.MaximumFrameSize - 1;
+            int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
+            frameCount = (int)(dataLength / framePayloadMax) + lastFrame;
+         }
+
+         return frameCount;
+      }
+      #endregion // Message Publishing
+   }
+}

Modified: incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs?rev=668164&r1=668163&r2=668164&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs (original)
+++ incubator/qpid/branches/M2.1.x/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs Mon Jun 16 07:02:16 2008
@@ -1,694 +1,699 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Collections;
-using System.Text;
-using log4net;
-using Apache.Qpid.Framing;
-using Apache.Qpid.Messaging;
-using Apache.Qpid.Buffer;
-
-namespace Apache.Qpid.Client.Message
-{
-    public abstract class AbstractQmsMessage : AMQMessage, IMessage
-    {
-        private static ILog log = LogManager.GetLogger(typeof(AbstractQmsMessage));
-
-        protected bool _redelivered;
-
-        protected ByteBuffer _data;
-        protected bool _readableMessage = false;
-        private QpidHeaders _headers;
-
-        protected AbstractQmsMessage(ByteBuffer data)
-            : base(new BasicContentHeaderProperties())
-        {
-            Init(data);
-        }
-
-        protected AbstractQmsMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data)
-            : this(contentHeader, deliveryTag)
-        {
-            Init(data);
-        }
-
-        protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader, long deliveryTag) : base(contentHeader, deliveryTag)
-        {
-            Init(null);
-        }
-
-        private void Init(ByteBuffer data)
-        {
-            _data = data;
-            if ( _data != null )
-            {
-                _data.Acquire();
-            }
-            _readableMessage = (data != null);
-            if ( ContentHeaderProperties.Headers == null )
-                ContentHeaderProperties.Headers = new FieldTable();
-            _headers = new QpidHeaders(ContentHeaderProperties.Headers);
-        }
-
-        //
-        // Properties
-        //
-
-        /// <summary>
-        /// The application message identifier
-        /// </summary>
-        public string MessageId
-        {
-            get 
-            {
-                if (ContentHeaderProperties.MessageId == null)
-                {
-                    ContentHeaderProperties.MessageId = "ID:" + DeliveryTag;
-                }
-                return ContentHeaderProperties.MessageId;
-            }
-            set { ContentHeaderProperties.MessageId = value; }
-        }
-
-        /// <summary>
-        /// The message timestamp
-        /// </summary>
-        public long Timestamp
-        {
-            get
-            {
-                // TODO: look at ulong/long choice
-                return (long) ContentHeaderProperties.Timestamp;
-            }
-            set
-            {
-                ContentHeaderProperties.Timestamp = (ulong) value;
-            }
-        }        
-
-        /// <summary>
-        /// The <see cref="CorrelationId"/> as a byte array.
-        /// </summary>
-        public byte[] CorrelationIdAsBytes
-        {
-            get { return Encoding.Default.GetBytes(ContentHeaderProperties.CorrelationId); }
-            set { ContentHeaderProperties.CorrelationId = Encoding.Default.GetString(value); }
-        }
-
-        /// <summary>
-        /// The application correlation identifier
-        /// </summary>
-        public string CorrelationId
-        {
-            get { return ContentHeaderProperties.CorrelationId; }
-            set { ContentHeaderProperties.CorrelationId = value; }
-        }
-        
-        struct Dest
-        {
-            public string ExchangeName;
-            public string RoutingKey;
-
-            public Dest(string exchangeName, string routingKey)
-            {
-                ExchangeName = exchangeName;
-                RoutingKey = routingKey;
-            }
-        }
-
-        /// <summary>
-        /// Exchange name of the reply-to address
-        /// </summary>
-        public string ReplyToExchangeName
-        {
-            get
-            {
-                return ReadReplyToHeader().ExchangeName;
-            }
-            set
-            {
-                BindingURL dest = ReadReplyToHeader();
-                dest.ExchangeName = value;
-                WriteReplyToHeader(dest);
-            }
-        }
-
-        /// <summary>
-        /// Routing key of the reply-to address
-        /// </summary>
-        public string ReplyToRoutingKey
-        {
-            get
-            {
-                return ReadReplyToHeader().RoutingKey;
-            }
-            set
-            {
-                BindingURL dest = ReadReplyToHeader();
-                dest.RoutingKey = value;
-                WriteReplyToHeader(dest);
-            }
-        }
-
-        /// <summary>
-        /// Non-persistent (1) or persistent (2)
-        /// </summary>
-        public DeliveryMode DeliveryMode
-        {
-            get
-            {
-                byte b = ContentHeaderProperties.DeliveryMode;
-                switch (b)
-                {
-                case 1:
-                    return DeliveryMode.NonPersistent;
-                case 2:
-                    return DeliveryMode.Persistent;
-                default:
-                    throw new QpidException("Illegal value for delivery mode in content header properties");
-                }                
-            }
-            set
-            {
-                ContentHeaderProperties.DeliveryMode = (byte)(value==DeliveryMode.NonPersistent?1:2);
-            }
-        }        
-
-        /// <summary>
-        /// True, if this is a redelivered message
-        /// </summary>
-        public bool Redelivered
-        {
-            get { return _redelivered; }
-            set { _redelivered = value; }
-        }
-
-        /// <summary>
-        /// The message type name
-        /// </summary>
-        public string Type
-        {
-            get { return ContentHeaderProperties.Type; }
-            set { ContentHeaderProperties.Type = value; }
-        }
-
-        /// <summary>
-        /// Message expiration specification
-        /// </summary>
-        public long Expiration
-        {
-            get { return ContentHeaderProperties.Expiration; }
-            set { ContentHeaderProperties.Expiration = value; }
-        }
-
-        /// <summary>
-        /// The message priority, 0 to 9
-        /// </summary>
-        public byte Priority
-        {
-            get { return ContentHeaderProperties.Priority; }
-            set { ContentHeaderProperties.Priority = (byte) value; }
-        }
-
-        /// <summary>
-        /// The MIME Content Type
-        /// </summary>
-        public string ContentType
-        {
-            get { return ContentHeaderProperties.ContentType; }
-            set { ContentHeaderProperties.ContentType = value; }
-        }
-
-        /// <summary>
-        /// The MIME Content Encoding
-        /// </summary>
-        public string ContentEncoding
-        {
-            get { return ContentHeaderProperties.Encoding; }
-            set { ContentHeaderProperties.Encoding = value; }
-        }
-
-        /// <summary>
-        /// Headers of this message
-        /// </summary>
-        public IHeaders Headers
-        {
-            get { return _headers; }
-        }
-
-        /// <summary>
-        /// The creating user id
-        /// </summary>
-        public string UserId
-        {
-            get { return ContentHeaderProperties.UserId; }
-            set { ContentHeaderProperties.UserId = value; }
-        }
-
-        /// <summary>
-        /// The creating application id
-        /// </summary>
-        public string AppId
-        {
-            get { return ContentHeaderProperties.AppId; }
-            set { ContentHeaderProperties.AppId = value; }
-        }
-
-        /// <summary>
-        /// Intra-cluster routing identifier
-        /// </summary>
-        public string ClusterId
-        {
-            get { return ContentHeaderProperties.ClusterId; }
-            set { ContentHeaderProperties.ClusterId = value; }
-        }
-
-        /// <summary>
-        /// Return the raw byte array that is used to populate the frame when sending
-        /// the message.
-        /// </summary>
-        /// <value>a byte array of message data</value>                
-        public ByteBuffer Data
-        {
-            get
-            {
-                if (_data != null)
-                {
-                    if (!_readableMessage)
-                    {
-                        _data.Flip();
-                    }
-                    else
-                    {
-                        // Make sure we rewind the data just in case any method has moved the
-                        // position beyond the start.
-                        _data.Rewind();
-                    }
-                }
-                return _data;
-            }
-
-            set
-            {
-                _data = value;
-            }
-        }
-
-        public void Acknowledge()
-        {
-            // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
-            // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
-            if (_channel != null)
-            {
-                // we set multiple to true here since acknowledgement implies acknowledge of all count messages
-                // received on the session
-                _channel.AcknowledgeMessage((ulong)DeliveryTag, true);
-            }
-
-        }
-
-        public abstract void ClearBodyImpl();
-
-        public void ClearBody()
-        {
-            ClearBodyImpl();
-            _readableMessage = false;
-        }
-
-        /// <summary>
-        /// Get a String representation of the body of the message. Used in the
-        /// toString() method which outputs this before message properties.
-        /// </summary>
-        /// <exception cref="QpidException"></exception>
-        public abstract string ToBodyString();
-
-        public override string ToString()
-        {
-            try
-            {
-                StringBuilder buf = new StringBuilder("Body:\n");
-                buf.Append(ToBodyString());
-                buf.Append("\nQmsTimestamp: ").Append(Timestamp);
-                buf.Append("\nQmsExpiration: ").Append(Expiration);
-                buf.Append("\nQmsPriority: ").Append(Priority);
-                buf.Append("\nQmsDeliveryMode: ").Append(DeliveryMode);
-                buf.Append("\nReplyToExchangeName: ").Append(ReplyToExchangeName);
-                buf.Append("\nReplyToRoutingKey: ").Append(ReplyToRoutingKey);
-                buf.Append("\nAMQ message number: ").Append(DeliveryTag);
-                buf.Append("\nProperties:");
-                if (ContentHeaderProperties.Headers == null)
-                {
-                    buf.Append("<NONE>");
-                }
-                else
-                {
-                    buf.Append(Headers.ToString());
-                }
-                return buf.ToString();
-            }
-            catch (Exception e)
-            {
-                return e.ToString();
-            }
-        }
-
-        public FieldTable PopulateHeadersFromMessageProperties()
-        {
-            if (ContentHeaderProperties.Headers == null)
-            {
-                return null;
-            }
-            else
-            {
-                //
-                // We need to convert every property into a String representation
-                // Note that type information is preserved in the property name
-                //
-                FieldTable table = new FieldTable();
-                foreach (DictionaryEntry entry in  ContentHeaderProperties.Headers)
-                {                    
-                    string propertyName = (string) entry.Key;
-                    if (propertyName == null)
-                    {
-                        continue;
-                    }
-                    else
-                    {
-                        table[propertyName] = entry.Value.ToString();
-                    }
-                }
-                return table;
-            }
-        }
-
-        public BasicContentHeaderProperties ContentHeaderProperties
-        {
-            get
-            {
-                return (BasicContentHeaderProperties) _contentHeaderProperties;
-            }
-        }
-
-        protected virtual void Reset()
-        {
-            _readableMessage = true;
-        }
-
-        public bool IsReadable
-        {
-            get { return _readableMessage; }
-        }
-
-        public bool isWritable
-        {
-            get { return !_readableMessage; }
-        }
-
-        protected void CheckReadable()
-        {
-            if ( !_readableMessage )
-            {
-                throw new MessageNotReadableException("You need to call reset() to make the message readable");
-            }
-        }
-
-        /// <summary>
-        /// Decodes the replyto field if one is set.
-        /// 
-        /// Splits a replyto field containing an exchange name followed by a ':', followed by a routing key into the exchange name and
-        /// routing key seperately. The exchange name may be empty in which case the empty string is returned. If the exchange name is
-        /// empty the replyto field is expected to being with ':'.
-        /// 
-        /// Anyhting other than a two part replyto field sperated with a ':' will result in an exception.
-        /// </summary>
-        /// 
-        /// <returns>A destination initialized to the replyto location if a replyto field was set, or an empty destination otherwise.</returns>
-        private BindingURL ReadReplyToHeader()
-        {
-            string replyToEncoding = ContentHeaderProperties.ReplyTo;
-            //log.Debug("replyToEncoding = " + replyToEncoding);
-
-            BindingURL bindingUrl = new BindingURL(replyToEncoding);
-            //log.Debug("bindingUrl = " + bindingUrl.ToString());
-
-            return bindingUrl;
-           
-            //log.Info("replyToEncoding = " + replyToEncoding);
-
-//             if ( replyToEncoding == null )
-//             {
-//                 return new Dest();
-//             } else
-//             {
-//                 // Split the replyto field on a ':'
-//                 string[] split = replyToEncoding.Split(':');
-
-//                 // Ensure that the replyto field argument only consisted of two parts.
-//                 if ( split.Length != 2 )
-//                 {
-//                     throw new QpidException("Illegal value in ReplyTo property: " + replyToEncoding);
-//                 }
-
-//                 // Extract the exchange name and routing key from the split replyto field.
-//                 string exchangeName = split[0];
-
-//                 string[] split2 = split[1].Split('/');
-//                 string routingKey = split2[3];
-
-//                 return new Dest(exchangeName, routingKey);
-//             }
-        }
-
-        private void WriteReplyToHeader(BindingURL dest)
-        {
-            string encodedDestination = string.Format("{0}:{1}", dest.ExchangeName, dest.RoutingKey);
-            ContentHeaderProperties.ReplyTo = encodedDestination;
-        }
-    }
-
-    public class BindingURL
-    {
-        public readonly static string OPTION_EXCLUSIVE = "exclusive";
-        public readonly static string OPTION_AUTODELETE = "autodelete";
-        public readonly static string OPTION_DURABLE = "durable";
-        public readonly static string OPTION_CLIENTID = "clientid";
-        public readonly static string OPTION_SUBSCRIPTION = "subscription";
-        public readonly static string OPTION_ROUTING_KEY = "routingkey";
-
-        /// <summary> Holds the undecoded URL </summary>
-        string url;
-
-        /// <summary> Holds the decoded options. </summary>
-        IDictionary options = new Hashtable();
-        
-        /// <summary> Holds the decoded exchange class. </summary>
-        string exchangeClass;
-
-        /// <summary> Holds the decoded exchange name. </summary>
-        string exchangeName;
-
-        /// <summary> Holds the destination name. </summary>
-        string destination;
-
-        /// <summary> Holds the decoded queue name. </summary>
-        string queueName;
-
-        /// <summary>
-        /// The binding URL has the format:
-        /// <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
-        /// </summary>
-        public BindingURL(string url)
-        {
-            this.url = url;
-            Parse();
-        }
-
-        public string Url { get { return url; } }
-
-        public string ExchangeClass
-        {
-            get { return exchangeClass; }
-            set { exchangeClass = value; }
-        }
-
-        public string ExchangeName
-        {
-            get { return exchangeName; } 
-            set { exchangeName = value; }
-        }
-
-        public string QueueName
-        {
-            get { return queueName; } 
-            set { queueName = value; }
-        }
-
-        public string DestinationName
-        {
-            get { return destination; } 
-            set { destination = value; }
-        }
-
-        public string RoutingKey {
-            get { return (string)options[OPTION_ROUTING_KEY]; }
-            set { options[OPTION_ROUTING_KEY] = value; }
-        }
-
-        public bool ContainsOption(string key) { return options.Contains(key); }
-
-        public string ToString() 
-        {
-            return "BindingURL: [ ExchangeClass = " + ExchangeClass + ", ExchangeName = " + ExchangeName + ", QueueName = " + QueueName + 
-                ", DestinationName = " + DestinationName + ", RoutingKey = " + RoutingKey + " ] ";
-        }
-
-        private void Parse()
-        {
-            Uri binding = new Uri(url);
-
-            // Extract the URI scheme, this contains the exchange class. It is defaulted to the direct exchange if not specified.
-            string exchangeClass = binding.Scheme;
-
-            if (exchangeClass == null)
-            {
-                url = ExchangeNameDefaults.DIRECT_EXCHANGE_CLASS + "://" + ExchangeNameDefaults.DIRECT + "//" + url;
-                Parse();
-
-                return;
-            }
-            else
-            {
-                this.exchangeClass = exchangeClass;
-            }
-
-            // Extract the host name, this contains the exchange name. It is defaulted to the default direct exchange if not specified.
-            string exchangeName = binding.Host;
-
-            if (exchangeName == null)
-            {
-                if (exchangeClass.Equals(ExchangeNameDefaults.DIRECT_EXCHANGE_CLASS))
-                {
-                    this.exchangeName = "";
-                }
-            }
-            else
-            {
-                this.exchangeName = exchangeName;
-            }
-
-            // Extract the destination and queue name.
-            if ((binding.AbsolutePath == null) || binding.AbsolutePath.Equals(""))
-            {
-                throw new UriFormatException("Destination or Queue required");
-            }
-            else
-            {
-                int slashOffset = binding.AbsolutePath.IndexOf("/", 1);
-                if (slashOffset == -1)
-                {
-                    throw new UriFormatException("Destination required");
-                }
-                else
-                {
-                    String path = binding.AbsolutePath;
-
-                    this.destination = path.Substring(1, slashOffset - 1);
-                    this.queueName = path.Substring(slashOffset + 1);
-                }
-            }
-
-            ParseOptions(options, binding.Query);
-
-            // If the routing key is not set as an option, set it to the destination name.
-            if (!ContainsOption(OPTION_ROUTING_KEY))
-            {
-                options[OPTION_ROUTING_KEY] = destination;
-            }
-        }
-
-        /// <summary>
-        /// options looks like this
-        /// brokerlist='tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'',failover='method?option='value',option='value'
-        /// </summary>
-        public static void ParseOptions(IDictionary optionMap, string options)
-        {
-            // Check that there really are some options to parse.
-            if ((options == null) || (options.IndexOf('=') == -1))
-            {
-                return;
-            }
-
-            int optionIndex = options.IndexOf('=');            
-            string option = options.Substring(0, optionIndex);            
-            int length = options.Length;            
-            int nestedQuotes = 0;
-
-            // Holds the index of the final "'".
-            int valueIndex = optionIndex;
-
-            // Loop over all the options.Dest
-            while ((nestedQuotes > 0) || (valueIndex < length))
-            {
-                valueIndex++;
-
-                if (valueIndex >= length)
-                {
-                    break;
-                }
-
-                if (options[valueIndex] == '\'')
-                {
-                    if ((valueIndex + 1) < options.Length)
-                    {
-                        if ((options[valueIndex + 1] == '&') || 
-                            (options[valueIndex + 1] == ',') ||
-                            (options[valueIndex + 1] == ';') ||
-                            (options[valueIndex + 1] == '\''))
-                        {
-                            nestedQuotes--;
-                            
-                            if (nestedQuotes == 0)
-                            {
-                                // We've found the value of an option
-                                break;
-                            }
-                        }
-                        else
-                        {
-                            nestedQuotes++;
-                        }
-                    }
-                    else
-                    {
-                        // We are at the end of the string
-                        // Check to see if we are corectly closing quotes
-                        if (options[valueIndex] == '\'')
-                        {
-                            nestedQuotes--;
-                        }
-
-                        break;
-                    }
-                }
-            }
-        }        
-    }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Text;
+using log4net;
+using Apache.Qpid.Framing;
+using Apache.Qpid.Messaging;
+using Apache.Qpid.Buffer;
+
+namespace Apache.Qpid.Client.Message
+{
+    public abstract class AbstractQmsMessage : AMQMessage, IMessage
+    {
+        private static ILog log = LogManager.GetLogger(typeof(AbstractQmsMessage));
+
+        protected bool _redelivered;
+
+        protected ByteBuffer _data;
+        protected bool _readableMessage = false;
+        private QpidHeaders _headers;
+
+        protected AbstractQmsMessage(ByteBuffer data)
+            : base(new BasicContentHeaderProperties())
+        {
+            Init(data);
+        }
+
+        protected AbstractQmsMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data)
+            : this(contentHeader, deliveryTag)
+        {
+            Init(data);
+        }
+
+        protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader, long deliveryTag) : base(contentHeader, deliveryTag)
+        {
+            Init(null);
+        }
+
+        private void Init(ByteBuffer data)
+        {
+            _data = data;
+            if ( _data != null )
+            {
+                _data.Acquire();
+            }
+            _readableMessage = (data != null);
+            if ( ContentHeaderProperties.Headers == null )
+                ContentHeaderProperties.Headers = new FieldTable();
+            _headers = new QpidHeaders(ContentHeaderProperties.Headers);
+        }
+
+        //
+        // Properties
+        //
+
+        /// <summary>
+        /// The application message identifier
+        /// </summary>
+        public string MessageId
+        {
+            get 
+            {
+                if (ContentHeaderProperties.MessageId == null)
+                {
+                    ContentHeaderProperties.MessageId = "ID:" + DeliveryTag;
+                }
+                return ContentHeaderProperties.MessageId;
+            }
+            set { ContentHeaderProperties.MessageId = value; }
+        }
+
+        /// <summary>
+        /// The message timestamp
+        /// </summary>
+        public long Timestamp
+        {
+            get
+            {
+                // TODO: look at ulong/long choice
+                return (long) ContentHeaderProperties.Timestamp;
+            }
+            set
+            {
+                ContentHeaderProperties.Timestamp = (ulong) value;
+            }
+        }        
+
+        /// <summary>
+        /// The <see cref="CorrelationId"/> as a byte array.
+        /// </summary>
+        public byte[] CorrelationIdAsBytes
+        {
+            get { return Encoding.Default.GetBytes(ContentHeaderProperties.CorrelationId); }
+            set { ContentHeaderProperties.CorrelationId = Encoding.Default.GetString(value); }
+        }
+
+        /// <summary>
+        /// The application correlation identifier
+        /// </summary>
+        public string CorrelationId
+        {
+            get { return ContentHeaderProperties.CorrelationId; }
+            set { ContentHeaderProperties.CorrelationId = value; }
+        }
+        
+        struct Dest
+        {
+            public string ExchangeName;
+            public string RoutingKey;
+
+            public Dest(string exchangeName, string routingKey)
+            {
+                ExchangeName = exchangeName;
+                RoutingKey = routingKey;
+            }
+        }
+
+        /// <summary>
+        /// Exchange name of the reply-to address
+        /// </summary>
+        public string ReplyToExchangeName
+        {
+            get
+            {
+                return ReadReplyToHeader().ExchangeName;
+            }
+            set
+            {
+                BindingURL dest = ReadReplyToHeader();
+                dest.ExchangeName = value;
+                WriteReplyToHeader(dest);
+            }
+        }
+
+        /// <summary>
+        /// Routing key of the reply-to address
+        /// </summary>
+        public string ReplyToRoutingKey
+        {
+            get
+            {
+                return ReadReplyToHeader().RoutingKey;
+            }
+            set
+            {
+                BindingURL dest = ReadReplyToHeader();
+                dest.RoutingKey = value;
+                WriteReplyToHeader(dest);
+            }
+        }
+
+        /// <summary>
+        /// Non-persistent (1) or persistent (2)
+        /// </summary>
+        public DeliveryMode DeliveryMode
+        {
+            get
+            {
+                byte b = ContentHeaderProperties.DeliveryMode;
+                switch (b)
+                {
+                case 1:
+                    return DeliveryMode.NonPersistent;
+                case 2:
+                    return DeliveryMode.Persistent;
+                default:
+                    throw new QpidException("Illegal value for delivery mode in content header properties");
+                }                
+            }
+            set
+            {
+                ContentHeaderProperties.DeliveryMode = (byte)(value==DeliveryMode.NonPersistent?1:2);
+            }
+        }        
+
+        /// <summary>
+        /// True, if this is a redelivered message
+        /// </summary>
+        public bool Redelivered
+        {
+            get { return _redelivered; }
+            set { _redelivered = value; }
+        }
+
+        /// <summary>
+        /// The message type name
+        /// </summary>
+        public string Type
+        {
+            get { return ContentHeaderProperties.Type; }
+            set { ContentHeaderProperties.Type = value; }
+        }
+
+        /// <summary>
+        /// Message expiration specification
+        /// </summary>
+        public long Expiration
+        {
+            get { return ContentHeaderProperties.Expiration; }
+            set { ContentHeaderProperties.Expiration = value; }
+        }
+
+        /// <summary>
+        /// The message priority, 0 to 9
+        /// </summary>
+        public byte Priority
+        {
+            get { return ContentHeaderProperties.Priority; }
+            set { ContentHeaderProperties.Priority = (byte) value; }
+        }
+
+        /// <summary>
+        /// The MIME Content Type
+        /// </summary>
+        public string ContentType
+        {
+            get { return ContentHeaderProperties.ContentType; }
+            set { ContentHeaderProperties.ContentType = value; }
+        }
+
+        /// <summary>
+        /// The MIME Content Encoding
+        /// </summary>
+        public string ContentEncoding
+        {
+            get { return ContentHeaderProperties.Encoding; }
+            set { ContentHeaderProperties.Encoding = value; }
+        }
+
+        /// <summary>
+        /// Headers of this message
+        /// </summary>
+        public IHeaders Headers
+        {
+            get { return _headers; }
+        }
+
+        /// <summary>
+        /// The creating user id
+        /// </summary>
+        public string UserId
+        {
+            get { return ContentHeaderProperties.UserId; }
+            set { ContentHeaderProperties.UserId = value; }
+        }
+
+        /// <summary>
+        /// The creating application id
+        /// </summary>
+        public string AppId
+        {
+            get { return ContentHeaderProperties.AppId; }
+            set { ContentHeaderProperties.AppId = value; }
+        }
+
+        /// <summary>
+        /// Intra-cluster routing identifier
+        /// </summary>
+        public string ClusterId
+        {
+            get { return ContentHeaderProperties.ClusterId; }
+            set { ContentHeaderProperties.ClusterId = value; }
+        }
+
+        /// <summary>
+        /// Return the raw byte array that is used to populate the frame when sending
+        /// the message.
+        /// </summary>
+        /// <value>a byte array of message data</value>                
+        public ByteBuffer Data
+        {
+            get
+            {
+                if (_data != null)
+                {
+                    if (!_readableMessage)
+                    {
+                        _data.Flip();
+                    }
+                    else
+                    {
+                        // Make sure we rewind the data just in case any method has moved the
+                        // position beyond the start.
+                        _data.Rewind();
+                    }
+                }
+                return _data;
+            }
+
+            set
+            {
+                _data = value;
+            }
+        }
+
+        public void Acknowledge()
+        {
+        	// we set multiple to true here since acknowledgement implies acknowledge of all messages
+            // received on the session. That's a bit JMSy though. 
+        	Acknowledge(true);
+        }
+        
+        public void Acknowledge(bool ackprevious)
+        {
+            // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
+            // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
+            if (_channel != null)
+            {
+                _channel.AcknowledgeMessage((ulong)DeliveryTag, ackprevious);
+            }
+
+        }
+
+        public abstract void ClearBodyImpl();
+
+        public void ClearBody()
+        {
+            ClearBodyImpl();
+            _readableMessage = false;
+        }
+
+        /// <summary>
+        /// Get a String representation of the body of the message. Used in the
+        /// toString() method which outputs this before message properties.
+        /// </summary>
+        /// <exception cref="QpidException"></exception>
+        public abstract string ToBodyString();
+
+        public override string ToString()
+        {
+            try
+            {
+                StringBuilder buf = new StringBuilder("Body:\n");
+                buf.Append(ToBodyString());
+                buf.Append("\nQmsTimestamp: ").Append(Timestamp);
+                buf.Append("\nQmsExpiration: ").Append(Expiration);
+                buf.Append("\nQmsPriority: ").Append(Priority);
+                buf.Append("\nQmsDeliveryMode: ").Append(DeliveryMode);
+                buf.Append("\nReplyToExchangeName: ").Append(ReplyToExchangeName);
+                buf.Append("\nReplyToRoutingKey: ").Append(ReplyToRoutingKey);
+                buf.Append("\nAMQ message number: ").Append(DeliveryTag);
+                buf.Append("\nProperties:");
+                if (ContentHeaderProperties.Headers == null)
+                {
+                    buf.Append("<NONE>");
+                }
+                else
+                {
+                    buf.Append(Headers.ToString());
+                }
+                return buf.ToString();
+            }
+            catch (Exception e)
+            {
+                return e.ToString();
+            }
+        }
+
+        public FieldTable PopulateHeadersFromMessageProperties()
+        {
+            if (ContentHeaderProperties.Headers == null)
+            {
+                return null;
+            }
+            else
+            {
+                //
+                // We need to convert every property into a String representation
+                // Note that type information is preserved in the property name
+                //
+                FieldTable table = new FieldTable();
+                foreach (DictionaryEntry entry in  ContentHeaderProperties.Headers)
+                {                    
+                    string propertyName = (string) entry.Key;
+                    if (propertyName == null)
+                    {
+                        continue;
+                    }
+                    else
+                    {
+                        table[propertyName] = entry.Value.ToString();
+                    }
+                }
+                return table;
+            }
+        }
+
+        public BasicContentHeaderProperties ContentHeaderProperties
+        {
+            get
+            {
+                return (BasicContentHeaderProperties) _contentHeaderProperties;
+            }
+        }
+
+        protected virtual void Reset()
+        {
+            _readableMessage = true;
+        }
+
+        public bool IsReadable
+        {
+            get { return _readableMessage; }
+        }
+
+        public bool isWritable
+        {
+            get { return !_readableMessage; }
+        }
+
+        protected void CheckReadable()
+        {
+            if ( !_readableMessage )
+            {
+                throw new MessageNotReadableException("You need to call reset() to make the message readable");
+            }
+        }
+
+        /// <summary>
+        /// Decodes the replyto field if one is set.
+        /// 
+        /// Splits a replyto field containing an exchange name followed by a ':', followed by a routing key into the exchange name and
+        /// routing key seperately. The exchange name may be empty in which case the empty string is returned. If the exchange name is
+        /// empty the replyto field is expected to being with ':'.
+        /// 
+        /// Anyhting other than a two part replyto field sperated with a ':' will result in an exception.
+        /// </summary>
+        /// 
+        /// <returns>A destination initialized to the replyto location if a replyto field was set, or an empty destination otherwise.</returns>
+        private BindingURL ReadReplyToHeader()
+        {
+            string replyToEncoding = ContentHeaderProperties.ReplyTo;
+            //log.Debug("replyToEncoding = " + replyToEncoding);
+
+            BindingURL bindingUrl = new BindingURL(replyToEncoding);
+            //log.Debug("bindingUrl = " + bindingUrl.ToString());
+
+            return bindingUrl;
+           
+            //log.Info("replyToEncoding = " + replyToEncoding);
+
+//             if ( replyToEncoding == null )
+//             {
+//                 return new Dest();
+//             } else
+//             {
+//                 // Split the replyto field on a ':'
+//                 string[] split = replyToEncoding.Split(':');
+
+//                 // Ensure that the replyto field argument only consisted of two parts.
+//                 if ( split.Length != 2 )
+//                 {
+//                     throw new QpidException("Illegal value in ReplyTo property: " + replyToEncoding);
+//                 }
+
+//                 // Extract the exchange name and routing key from the split replyto field.
+//                 string exchangeName = split[0];
+
+//                 string[] split2 = split[1].Split('/');
+//                 string routingKey = split2[3];
+
+//                 return new Dest(exchangeName, routingKey);
+//             }
+        }
+
+        private void WriteReplyToHeader(BindingURL dest)
+        {
+            string encodedDestination = string.Format("{0}:{1}", dest.ExchangeName, dest.RoutingKey);
+            ContentHeaderProperties.ReplyTo = encodedDestination;
+        }
+    }
+
+    public class BindingURL
+    {
+        public readonly static string OPTION_EXCLUSIVE = "exclusive";
+        public readonly static string OPTION_AUTODELETE = "autodelete";
+        public readonly static string OPTION_DURABLE = "durable";
+        public readonly static string OPTION_CLIENTID = "clientid";
+        public readonly static string OPTION_SUBSCRIPTION = "subscription";
+        public readonly static string OPTION_ROUTING_KEY = "routingkey";
+
+        /// <summary> Holds the undecoded URL </summary>
+        string url;
+
+        /// <summary> Holds the decoded options. </summary>
+        IDictionary options = new Hashtable();
+        
+        /// <summary> Holds the decoded exchange class. </summary>
+        string exchangeClass;
+
+        /// <summary> Holds the decoded exchange name. </summary>
+        string exchangeName;
+
+        /// <summary> Holds the destination name. </summary>
+        string destination;
+
+        /// <summary> Holds the decoded queue name. </summary>
+        string queueName;
+
+        /// <summary>
+        /// The binding URL has the format:
+        /// <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
+        /// </summary>
+        public BindingURL(string url)
+        {
+            this.url = url;
+            Parse();
+        }
+
+        public string Url { get { return url; } }
+
+        public string ExchangeClass
+        {
+            get { return exchangeClass; }
+            set { exchangeClass = value; }
+        }
+
+        public string ExchangeName
+        {
+            get { return exchangeName; } 
+            set { exchangeName = value; }
+        }
+
+        public string QueueName
+        {
+            get { return queueName; } 
+            set { queueName = value; }
+        }
+
+        public string DestinationName
+        {
+            get { return destination; } 
+            set { destination = value; }
+        }
+
+        public string RoutingKey {
+            get { return (string)options[OPTION_ROUTING_KEY]; }
+            set { options[OPTION_ROUTING_KEY] = value; }
+        }
+
+        public bool ContainsOption(string key) { return options.Contains(key); }
+
+        public string ToString() 
+        {
+            return "BindingURL: [ ExchangeClass = " + ExchangeClass + ", ExchangeName = " + ExchangeName + ", QueueName = " + QueueName + 
+                ", DestinationName = " + DestinationName + ", RoutingKey = " + RoutingKey + " ] ";
+        }
+
+        private void Parse()
+        {
+            Uri binding = new Uri(url);
+
+            // Extract the URI scheme, this contains the exchange class. It is defaulted to the direct exchange if not specified.
+            string exchangeClass = binding.Scheme;
+
+            if (exchangeClass == null)
+            {
+                url = ExchangeNameDefaults.DIRECT_EXCHANGE_CLASS + "://" + ExchangeNameDefaults.DIRECT + "//" + url;
+                Parse();
+
+                return;
+            }
+            else
+            {
+                this.exchangeClass = exchangeClass;
+            }
+
+            // Extract the host name, this contains the exchange name. It is defaulted to the default direct exchange if not specified.
+            string exchangeName = binding.Host;
+
+            if (exchangeName == null)
+            {
+                if (exchangeClass.Equals(ExchangeNameDefaults.DIRECT_EXCHANGE_CLASS))
+                {
+                    this.exchangeName = "";
+                }
+            }
+            else
+            {
+                this.exchangeName = exchangeName;
+            }
+
+            // Extract the destination and queue name.
+            if ((binding.AbsolutePath == null) || binding.AbsolutePath.Equals(""))
+            {
+                throw new UriFormatException("Destination or Queue required");
+            }
+            else
+            {
+                int slashOffset = binding.AbsolutePath.IndexOf("/", 1);
+                if (slashOffset == -1)
+                {
+                    throw new UriFormatException("Destination required");
+                }
+                else
+                {
+                    String path = binding.AbsolutePath;
+
+                    this.destination = path.Substring(1, slashOffset - 1);
+                    this.queueName = path.Substring(slashOffset + 1);
+                }
+            }
+
+            ParseOptions(options, binding.Query);
+
+            // If the routing key is not set as an option, set it to the destination name.
+            if (!ContainsOption(OPTION_ROUTING_KEY))
+            {
+                options[OPTION_ROUTING_KEY] = destination;
+            }
+        }
+
+        /// <summary>
+        /// options looks like this
+        /// brokerlist='tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'',failover='method?option='value',option='value'
+        /// </summary>
+        public static void ParseOptions(IDictionary optionMap, string options)
+        {
+            // Check that there really are some options to parse.
+            if ((options == null) || (options.IndexOf('=') == -1))
+            {
+                return;
+            }
+
+            int optionIndex = options.IndexOf('=');            
+            string option = options.Substring(0, optionIndex);            
+            int length = options.Length;            
+            int nestedQuotes = 0;
+
+            // Holds the index of the final "'".
+            int valueIndex = optionIndex;
+
+            // Loop over all the options.Dest
+            while ((nestedQuotes > 0) || (valueIndex < length))
+            {
+                valueIndex++;
+
+                if (valueIndex >= length)
+                {
+                    break;
+                }
+
+                if (options[valueIndex] == '\'')
+                {
+                    if ((valueIndex + 1) < options.Length)
+                    {
+                        if ((options[valueIndex + 1] == '&') || 
+                            (options[valueIndex + 1] == ',') ||
+                            (options[valueIndex + 1] == ';') ||
+                            (options[valueIndex + 1] == '\''))
+                        {
+                            nestedQuotes--;
+                            
+                            if (nestedQuotes == 0)
+                            {
+                                // We've found the value of an option
+                                break;
+                            }
+                        }
+                        else
+                        {
+                            nestedQuotes++;
+                        }
+                    }
+                    else
+                    {
+                        // We are at the end of the string
+                        // Check to see if we are corectly closing quotes
+                        if (options[valueIndex] == '\'')
+                        {
+                            nestedQuotes--;
+                        }
+
+                        break;
+                    }
+                }
+            }
+        }        
+    }
+}

Modified: incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/Qpid.Integration.Tests.csproj
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/Qpid.Integration.Tests.csproj?rev=668164&r1=668163&r2=668164&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/Qpid.Integration.Tests.csproj (original)
+++ incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/Qpid.Integration.Tests.csproj Mon Jun 16 07:02:16 2008
@@ -47,6 +47,7 @@
     <Compile Include="testcases\DurableSubscriptionTest.cs" />
     <Compile Include="testcases\HeadersExchangeTest.cs" />
     <Compile Include="testcases\MandatoryMessageTest.cs" />
+    <Compile Include="testcases\ClientAckTests.cs" />
     <Compile Include="testcases\ProducerMultiConsumerTest.cs" />
     <Compile Include="testcases\SslConnectionTest.cs" />
     <Compile Include="testcases\SustainedTest.cs" />
@@ -63,4 +64,10 @@
   <ItemGroup>
     <Compile Include="interop\TestCases\TestCase2BasicP2P.cs" />
   </ItemGroup>
+  <ItemGroup>
+    <ProjectReference Include="..\Qpid.Client\Qpid.Client.csproj">
+      <Project>{68987C05-3768-452C-A6FC-6BA1D372852F}</Project>
+      <Name>Qpid.Client</Name>
+    </ProjectReference>
+  </ItemGroup>
 </Project>
\ No newline at end of file

Modified: incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs?rev=668164&r1=668163&r2=668164&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs (original)
+++ incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs Mon Jun 16 07:02:16 2008
@@ -297,5 +297,14 @@
             
             return buf.ToString();
         }
+        
+        protected void SendMessages(int count, IMessagePublisher pub)
+        {
+        	for (int i = 0; i < count; i++) 
+        	{
+        		pub.Send(pub.Channel.CreateTextMessage("Test message "+i));
+        	}
+        	
+        }
     }
 }

Added: incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/ClientAckTests.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/ClientAckTests.cs?rev=668164&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/ClientAckTests.cs (added)
+++ incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/ClientAckTests.cs Mon Jun 16 07:02:16 2008
@@ -0,0 +1,179 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+using System;
+using System.Threading;
+using log4net;
+using NUnit.Framework;
+using Apache.Qpid.Messaging;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client;
+
+namespace Apache.Qpid.Integration.Tests.testcases
+{
+	/// <summary>
+	/// Checks that byte messages can be produced and received properly.
+	/// </summary>
+	[TestFixture, Category("Integration")]
+	public class ClientAckTests : BaseMessagingTestFixture
+	{
+		private static ILog log = LogManager.GetLogger(typeof(ClientAckTests));
+		private static string TEST_ROUTING_KEY = "MESSAGE_ACK_TEST_QUEUE";
+		private IMessage msgA;
+		private IMessage msgB;
+		private IMessage msgC;
+		
+		[SetUp]
+        public override void Init()
+        {
+            base.Init();
+
+            // Create one producer and one consumer, p2p, tx, consumer with queue bound to producers routing key.
+            SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.ClientAcknowledge, false, ExchangeNameDefaults.DIRECT, 
+                          true, false, null);
+            SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.ClientAcknowledge, false, ExchangeNameDefaults.DIRECT, 
+                          true, true, testId.ToString());
+            // Send 3 messages and get them back
+            SendMessages(3, testProducer[0]);
+        	msgA = testConsumer[1].Receive();
+        	msgB = testConsumer[1].Receive();
+        	msgC = testConsumer[1].Receive();
+        }
+		
+		[TearDown]
+        public override void Shutdown()
+        {
+            try
+            {
+                // Clean up after the test.
+                CloseEndPoint(0);
+                CloseEndPoint(1);
+            } 
+            finally 
+            {
+                base.Shutdown();
+            }
+        }
+        
+        [Test]
+        /// <summary> Send 3 messages, get them back and each one rolling acks up. </summary>
+        public void TestAckingABCAll()
+        {
+        	msgA.Acknowledge();
+        	msgB.Acknowledge();
+        	msgC.Acknowledge();
+        	
+        	CloseEndPoint(1);
+        	SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.ClientAcknowledge, false, ExchangeNameDefaults.DIRECT, 
+                          true, true, testId.ToString());
+        	ConsumeNMessagesOnly(0, "wibble", testConsumer[1]);
+        }
+        
+        [Test]
+        /// <summary> Send 3 messages, get them back and ack each one individually </summary>
+        public void TestAckingABCIndividual()
+        {
+        	msgA.Acknowledge(false);
+        	msgB.Acknowledge(false);
+        	msgC.Acknowledge(false);
+
+        	CloseEndPoint(1);
+        	SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.ClientAcknowledge, false, ExchangeNameDefaults.DIRECT, 
+                          true, true, testId.ToString());
+        	ConsumeNMessagesOnly(0, "wibble", testConsumer[1]);
+        }
+        
+        [Test]
+        /// <summary> Send 3 messages, get them back and the middle one only rolling acks up. </summary>
+        public void TestAckingBOnlyAll()
+        {
+        	msgB.Acknowledge();
+        	
+        	CloseEndPoint(1);
+        	SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.ClientAcknowledge, false, ExchangeNameDefaults.DIRECT, 
+                          true, true, testId.ToString());
+        	log.Debug("Checking we get the last message back");
+        	ConsumeNMessagesOnly(1, "Test message 2", testConsumer[1]);
+        }
+
+        [Test]
+        /// <summary> Send 3 messages, get them back and ack the middle one only individually. </summary>
+        public void TestAckingBOnlyIndividual()
+        {
+        	msgB.Acknowledge(false);
+        	CloseEndPoint(1);
+			SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.ClientAcknowledge, false, ExchangeNameDefaults.DIRECT, 
+                          true, true, testId.ToString());        	
+        	ConsumeNMessages(1, "Test message 0", testConsumer[1]);
+        	ConsumeNMessagesOnly(1, "Test message 2", testConsumer[1]);
+        }
+
+        [Test]
+        /// <summary> Send 3 messages, get them back and ack the last one, rolling acks up. </summary>
+        public void TestAckingCOnlyAll()
+        {
+        	msgC.Acknowledge();
+        	
+        	CloseEndPoint(1);
+			SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.ClientAcknowledge, false, ExchangeNameDefaults.DIRECT, 
+                          true, true, testId.ToString());        	
+        	ConsumeNMessagesOnly(0, "wibble", testConsumer[1]);
+        }
+
+        [Test]
+        /// <summary> Send 3 messages, get them back and ack the last oneindivdually. </summary>
+        public void TestAckingCOnlyIndividual()
+        {
+        	msgC.Acknowledge(false);
+        	
+        	CloseEndPoint(1);
+			SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.ClientAcknowledge, false, ExchangeNameDefaults.DIRECT, 
+                          true, true, testId.ToString());        	
+        	ConsumeNMessages(1, "Test message 0", testConsumer[1]);
+        	ConsumeNMessagesOnly(1, "Test message 1", testConsumer[1]);
+        }
+
+        [Test]
+        /// <summary> Send 3 messages, get them back and the first two indivdually. </summary>
+        public void TestAckingAtoBIndivdual()
+        {
+        	msgA.Acknowledge(false);
+        	msgB.Acknowledge(false);
+        	CloseEndPoint(1);
+			SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.ClientAcknowledge, false, ExchangeNameDefaults.DIRECT, 
+                          true, true, testId.ToString());   
+        	ConsumeNMessagesOnly(1, "Test message 2", testConsumer[1]);
+        }
+        
+        [Test]
+        /// <summary> Send 3 messages, get them back and ack the first and last one indivdually. </summary>
+        public void TestAckingAandCIndivdual()
+        {
+        	msgA.Acknowledge(false);
+        	msgC.Acknowledge(false);
+        	CloseEndPoint(1);
+        	SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.ClientAcknowledge, false, ExchangeNameDefaults.DIRECT, 
+        	              true, true, testId.ToString());
+        	//((AmqChannel)testChannel[2]).Suspend(false);
+        	ConsumeNMessagesOnly(1, "Test message 1", testConsumer[1]);
+        }
+	}
+}

Propchange: incubator/qpid/branches/M2.1.x/dotnet/Qpid.Integration.Tests/testcases/ClientAckTests.cs
------------------------------------------------------------------------------
    svn:executable = *