You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by to...@apache.org on 2007/05/18 02:51:15 UTC

svn commit: r539198 [1/2] - in /incubator/qpid/trunk/qpid: ./ dotnet/Qpid.Buffer.Tests/ dotnet/Qpid.Buffer/ dotnet/Qpid.Client.Tests/ dotnet/Qpid.Client.Tests/BrokerDetails/ dotnet/Qpid.Client.Tests/Channel/ dotnet/Qpid.Client.Tests/Messages/ dotnet/Qp...

Author: tomasr
Date: Thu May 17 17:51:12 2007
New Revision: 539198

URL: http://svn.apache.org/viewvc?view=rev&rev=539198
Log:
Merged revisions 537954-538078,538080-538083,538085-538097,538099-538108,538110-538239,538241-538881,538883-538906,538908-538911,538913-538921,538923-539191 via svnmerge from 
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2

........
  r537954 | tomasr | 2007-05-14 14:10:59 -0500 (Mon, 14 May 2007) | 4 lines
  
  * QPID-487 (Contributed by Carlos Medina) Fix QpidConnectionInfo.ToString()
  * QPID-485 (Contributed by Carlos Medina) Fix AmqBrokerInfo.Equals()
  * QPID-456 Enforce virtual host names start with '/'
........
  r538035 | tomasr | 2007-05-14 20:33:00 -0500 (Mon, 14 May 2007) | 6 lines
  
  * QPID-452 Improve message classes API
  * Add XML documentation to IChannel and IMessage
  * Add missing BrokerDetailTests
  * Add new tests for message creation and message factories
  * Fix wrong default encoding for text messages
........
  r539178 | tomasr | 2007-05-17 18:50:50 -0500 (Thu, 17 May 2007) | 6 lines
  
  * QPID-492 Fix Race condition in message decoding
  * QPID-249 Make ServiceRequestingClient and ServiceProvidingClient a single, self contained test
  * Fix incorrect exception message in Qpid.Buffers, improve tests
  * Make ContentBody use an sliced buffer to avoid extra data copy
  * Remove useless tests in Qpid.Client (Blocking IO tests)
........
  r539191 | tomasr | 2007-05-17 19:18:26 -0500 (Thu, 17 May 2007) | 1 line
  
  QPID-490 (Contributed by Carlos Medina) Implement PurgeQueue and DeleteQueue
........

Added:
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/BrokerDetails/
      - copied from r538035, incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/BrokerDetails/
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/BrokerDetails/BrokerDetailsTest.cs
      - copied unchanged from r538035, incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/BrokerDetails/BrokerDetailsTest.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/Channel/
      - copied from r538035, incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Channel/
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/Channel/ChannelMessageCreationTests.cs
      - copied unchanged from r538035, incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Channel/ChannelMessageCreationTests.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/Channel/ChannelQueueTest.cs
      - copied unchanged from r539191, incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Channel/ChannelQueueTest.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/Messages/
      - copied from r538035, incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Messages/
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/Messages/MessageFactoryRegistryTests.cs
      - copied unchanged from r538035, incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Messages/MessageFactoryRegistryTests.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs
      - copied unchanged from r539191, incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs
      - copied unchanged from r539191, incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs
      - copied unchanged from r539191, incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs
Removed:
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/bio/
Modified:
    incubator/qpid/trunk/qpid/   (props changed)
    incubator/qpid/trunk/qpid/dotnet/Qpid.Buffer.Tests/SimpleByteBufferTests.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Buffer/ByteBuffer.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/log4net.config
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/url/ConnectionUrlTest.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/QpidConnectionInfo.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Qpid.Client.csproj
    incubator/qpid/trunk/qpid/dotnet/Qpid.Codec/CumulativeProtocolDecoder.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Framing/BasicContentHeaderProperties.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Framing/ContentBody.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Framing/FieldTable.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IChannel.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IHeaders.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IMessage.cs

Propchange: incubator/qpid/trunk/qpid/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Buffer.Tests/SimpleByteBufferTests.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Buffer.Tests/SimpleByteBufferTests.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Buffer.Tests/SimpleByteBufferTests.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Buffer.Tests/SimpleByteBufferTests.cs Thu May 17 17:51:12 2007
@@ -120,6 +120,9 @@
          Assert.AreEqual(10, buffer.Limit);
          Assert.AreEqual(2, buffer.Position);
          Assert.AreEqual(8, buffer.Remaining);
+         buffer.Rewind();
+         Assert.AreEqual((byte)0x02, buffer.GetByte());
+         Assert.AreEqual((byte)0x03, buffer.GetByte());
       }
 
       [Test]
@@ -326,3 +329,4 @@
 
    } // class SimpleByteBufferTests
 }
+

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Buffer/ByteBuffer.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Buffer/ByteBuffer.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Buffer/ByteBuffer.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Buffer/ByteBuffer.cs Thu May 17 17:51:12 2007
@@ -809,7 +809,7 @@
       /// </summary>
       /// <param name="position">Position to read from</param>
       /// <returns>The value at the position</returns>
-      public char getChar(int position)
+      public char GetChar(int position)
       {
          return (char)GetUInt16(position);
       }
@@ -941,7 +941,7 @@
       {
          if ( position + length > Limit )
          {
-            throw new BufferUnderflowException("Attempt to read " + length + " byte(s) to buffer where position is " + _position +
+            throw new BufferUnderflowException("Attempt to read " + length + " byte(s) to buffer where position is " + position +
                                                " and limit is " + Limit);
          }
       }
@@ -954,7 +954,7 @@
          }
          if ( position + length > Limit )
          {
-            throw new BufferOverflowException("Attempt to write " + length + " byte(s) to buffer where position is " + _position +
+            throw new BufferOverflowException("Attempt to write " + length + " byte(s) to buffer where position is " + position +
                                               " and limit is " + Limit);
          }
       }

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs Thu May 17 17:51:12 2007
@@ -38,7 +38,7 @@
 
         private const int MESSAGE_COUNT = 1000;
 
-        private const string MESSAGE_DATA_BYTES = "jfd ghljgl hjvhlj cvhvjf ldhfsj lhfdsjf hldsjfk hdslkfj hsdflk  ";
+        private const string MESSAGE_DATA_BYTES = "****jfd ghljgl hjvhlj cvhvjf ldhfsj lhfdsjf hldsjfk hdslkfj hsdflk  ";
 
         AutoResetEvent _finishedEvent = new AutoResetEvent(false);
 
@@ -100,7 +100,9 @@
                 _logger.Info("All messages received");
                 _finishedEvent.Set();
             }
-        }
+            if ( newCount % 100 == 0 ) 
+               System.Diagnostics.Debug.WriteLine(((ITextMessage)m).Text);
+         }
         
         [Test]
         public void RunTest()
@@ -110,7 +112,7 @@
                 ITextMessage msg;
                 try
                 {
-                    msg = _channel.CreateTextMessage(GetData(512 + 8*i));
+                   msg = _channel.CreateTextMessage(GetData(512 + 8*i));
                 }
                 catch (Exception e)
                 {

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj Thu May 17 17:51:12 2007
@@ -44,7 +44,10 @@
     <Reference Include="System.Xml" />
   </ItemGroup>
   <ItemGroup>
-    <Compile Include="bio\BlockingIo.cs" />
+    <Compile Include="BrokerDetails\BrokerDetailsTest.cs" />
+    <Compile Include="Channel\ChannelMessageCreationTests.cs" />
+    <Compile Include="Channel\ChannelQueueTest.cs" />
+    <Compile Include="Messages\MessageFactoryRegistryTests.cs" />
     <Compile Include="connection\ConnectionTest.cs" />
     <Compile Include="connection\SslConnectionTest.cs" />
     <Compile Include="failover\FailoverTest.cs" />

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/log4net.config
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/log4net.config?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/log4net.config (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/log4net.config Thu May 17 17:51:12 2007
@@ -49,8 +49,12 @@
     <level value="info" />
     <appender-ref ref="ioLog"/>
   </logger>
+   <logger name="Qpid.Framing.FieldTable" additivity="false">
+      <level value="debug" />
+      <appender-ref ref="console"/>
+   </logger>
 
-  <root>
+   <root>
     <appender-ref ref="console"/>
     <appender-ref ref="UdpAppender"/>
 	  <appender-ref ref="filelog"/>

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs Thu May 17 17:51:12 2007
@@ -35,16 +35,15 @@
 
         private string _replyToExchangeName;
         private string _replyToRoutingKey;
+        const int PACK = 100;
 
         private IMessagePublisher _destinationPublisher;
+        private IMessageConsumer _consumer;
 
         private string _serviceName = "ServiceQ1";
 
         private string _selector = null;
 
-        //private EventWaitHandle _event = new ManualResetEvent(false);
-        private AutoResetEvent _event = new AutoResetEvent(false);
-
         [SetUp]
         public override void Init()
         {            
@@ -59,36 +58,38 @@
             
             _channel.DeclareQueue(_serviceName, false, false, false);
 
-            IMessageConsumer consumer = _channel.CreateConsumerBuilder(_serviceName)
+            _consumer = _channel.CreateConsumerBuilder(_serviceName)
                 .WithPrefetchLow(100)
                 .WithPrefetchHigh(500)
                 .WithNoLocal(true)
                 .Create();
-            consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
+            _consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
+        }
+
+        public override void Shutdown()
+        {
+           _consumer.Dispose();
+           base.Shutdown();
         }
 
         private void OnConnectionException(Exception e)
         {
             _logger.Info("Connection exception occurred", e);
-            _event.Set(); // Shutdown test on error
             // XXX: Test still doesn't shutdown when broker terminates. Is there no heartbeat?
         }
 
         [Test]
-        public void TestFail()
-        {
-            Assert.Fail("Tests in this class do not run on autopilot, but hang forever, so commented out until can be fixed.");
-        }
-
-        /*[Test]
         public void Test()
         {
             _connection.Start();
             _logger.Info("Waiting...");
-            _event.WaitOne();
-        }*/
 
-        public void OnMessage(IMessage message)
+            ServiceRequestingClient client = new ServiceRequestingClient();
+            client.Init();
+            client.SendMessages();
+        }
+
+        private void OnMessage(IMessage message)
         {
 //            _logger.Info("Got message '" + message + "'");
 
@@ -109,9 +110,9 @@
                 _destinationPublisher = _channel.CreatePublisherBuilder()
                     .WithExchangeName(_replyToExchangeName)
                     .WithRoutingKey(_replyToRoutingKey)
+                    .WithDeliveryMode(DeliveryMode.NonPersistent)
                     .Create();
                 _destinationPublisher.DisableMessageTimestamp = true;
-                _destinationPublisher.DeliveryMode = DeliveryMode.NonPersistent;
                 _logger.Debug("After create a producer");
             }
             catch (QpidException e)
@@ -120,7 +121,7 @@
                 throw e;
             }
             _messageCount++;
-            if (_messageCount % 1000 == 0)
+            if (_messageCount % PACK == 0)
             {
                 _logger.Info("Received message total: " + _messageCount);
                 _logger.Info(string.Format("Sending response to '{0}:{1}'", 
@@ -129,25 +130,20 @@
 
             try
             {
-                String payload = "This is a response: sing together: 'Mahnah mahnah...'" + tm.Text;
-                ITextMessage msg = _channel.CreateTextMessage(payload);
-                if (tm.Headers.Contains("timeSent"))
-                {
-//                    _logger.Info("timeSent property set on message");
-//                    _logger.Info("timeSent value is: " + tm.Headers["timeSent"]);
-                    msg.Headers["timeSent"] = tm.Headers["timeSent"];
-                }
-                _destinationPublisher.Send(msg);
-                if (_messageCount % 1000 == 0)
-                {
-                    _logger.Info(string.Format("Sending response to '{0}:{1}'",
-                                               _replyToExchangeName, _replyToRoutingKey));
-                }
-            }
-            catch (QpidException e)
+               String payload = "This is a response: sing together: 'Mahnah mahnah...'" + tm.Text;
+               ITextMessage msg = _channel.CreateTextMessage(payload);
+               if ( tm.Headers.Contains("timeSent") )
+               {
+                  msg.Headers["timeSent"] = tm.Headers["timeSent"];
+               }
+               _destinationPublisher.Send(msg);
+            } catch ( QpidException e )
             {
-                _logger.Error("Error sending message: " + e, e);
-                throw e;
+               _logger.Error("Error sending message: " + e, e);
+               throw e;
+            } finally
+            {
+               _destinationPublisher.Dispose();
             }
         }               
     }

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs Thu May 17 17:51:12 2007
@@ -26,18 +26,18 @@
 
 namespace Qpid.Client.Tests
 {
-    [TestFixture]
     public class ServiceRequestingClient : BaseMessagingTestFixture
     {
         private const int MESSAGE_SIZE = 1024;
         private static string MESSAGE_DATA = new string('x', MESSAGE_SIZE);
 
-        private const int NUM_MESSAGES = 10000;
+        private const int PACK = 100;
+        private const int NUM_MESSAGES = PACK*10; // increase when in standalone
 
         private static ILog _log = LogManager.GetLogger(typeof(ServiceRequestingClient));
 
-        AutoResetEvent _finishedEvent = new AutoResetEvent(false);
-
+        ManualResetEvent _finishedEvent = new ManualResetEvent(false);
+        
         private int _expectedMessageCount = NUM_MESSAGES;
 
         private long _startTime = 0;        
@@ -54,9 +54,9 @@
             {
                 _publisher = _channel.CreatePublisherBuilder()
                     .WithRoutingKey(_commandQueueName)
+                    .WithDeliveryMode(DeliveryMode.NonPersistent)
                     .Create();
                 _publisher.DisableMessageTimestamp = true; // XXX: need a "with" for this in builder?
-                _publisher.DeliveryMode = DeliveryMode.NonPersistent;  // XXX: need a "with" for this in builder?
             }
             catch (QpidException e)
             {
@@ -64,7 +64,7 @@
             }
         }
         
-        /*[Test]
+        [Test]
         public void SendMessages()
         {
             InitialiseProducer();
@@ -100,47 +100,18 @@
                 // Added timestamp.
                 long timeNow = DateTime.Now.Ticks;
                 string timeSentString = String.Format("{0:G}", timeNow);
-//                _log.Info(String.Format("timeSent={0} timeSentString={1}", timeNow, timeSentString));
-                msg.Headers.SetString("timeSent", timeSentString);
-                //msg.Headers.SetLong("sentAt", timeNow);
+                msg.Headers.SetLong("timeSent", timeNow);
                 
-                try
-                {
-                    _publisher.Send(msg);
-                }
-                catch (Exception e)
-                {
-                    _log.Error("Error sending message: " + e, e);
-                    //base._port = 5673;
-                    _log.Info("Reconnecting but on port 5673");
-                    try
-                    {
-                        base.Init();
-                        InitialiseProducer();
-                        // cheesy but a quick test
-                        _log.Info("Calling SendMessages again");
-                        SendMessages();
-                    }
-                    catch (Exception ex)
-                    {
-                        _log.Error("Totally busted: failed to reconnect: " + ex, ex);
-                    }
-                }
+                 _publisher.Send(msg);
             }
 
             // Assert that the test finishes within a reasonable amount of time.
-            const int waitSeconds = 10;
+            const int waitSeconds = 40;
             const int waitMilliseconds = waitSeconds * 1000;
             _log.Info("Finished sending " + _expectedMessageCount + " messages");
             _log.Info(String.Format("Waiting {0} seconds to receive last message...", waitSeconds));
             Assert.IsTrue(_finishedEvent.WaitOne(waitMilliseconds, false), 
                           String.Format("Expected to finish in {0} seconds", waitSeconds));
-        }*/
-
-        [Test]
-        public void TestFail()
-        {
-            Assert.Fail("Tests in this class do not run on autopilot, but hang forever, so commented out until can be fixed.");
         }
 
         public void OnMessage(IMessage m)
@@ -150,23 +121,19 @@
                 _log.Debug("Message received: " + m);
             }
 
-            //if (m.Headers.Contains("sentAt"))
             if (!m.Headers.Contains("timeSent"))
             {
                 throw new Exception("Set timeSent!");
             }
-            //long sentAt = m.Headers.GetLong("sentAt");
-            long sentAt = Int64.Parse(m.Headers.GetString("timeSent"));
+
+            long sentAt = m.Headers.GetLong("timeSent");
             long now = DateTime.Now.Ticks;
             long latencyTicks = now - sentAt;
-//                _log.Info(String.Format("latency = {0} ticks ", latencyTicks));
             long latencyMilliseconds = latencyTicks / TimeSpan.TicksPerMillisecond;
-//                _log.Info(String.Format("latency = {0} ms", latencyMilliseconds));
 
             averager.Add(latencyMilliseconds);
 
-            // Output average every 1000 messages.
-            if (averager.Num % 1000 == 0)
+            if (averager.Num % PACK == 0)
             {
                 _log.Info("Ticks per millisecond = " + TimeSpan.TicksPerMillisecond);
                 _log.Info(String.Format("Average latency (ms) = {0}", averager));
@@ -185,13 +152,6 @@
                 _finishedEvent.Set(); // Notify main thread to quit.
             }
         }
-        
-        /*public static void Main(String[] args)
-        {
-            ServiceRequestingClient c = new ServiceRequestingClient();
-            c.Init();
-            c.SendMessages();
-        }*/
     }
     
     class Avergager

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/url/ConnectionUrlTest.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/url/ConnectionUrlTest.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/url/ConnectionUrlTest.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/url/ConnectionUrlTest.cs Thu May 17 17:51:12 2007
@@ -404,5 +404,43 @@
                 Assert.AreEqual("Unterminated option", e.Message);
             }
         }
+
+        [Test]
+        public void ValidateQpidConnectionInfoFromToString()
+        {
+            String url = "amqp://ritchiem:bob@default/temp?brokerlist='tcp://localhost:5672;tcp://fancyserver:3000/',failover='roundrobin'";
+
+            IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(url);
+            IConnectionInfo connectionInfo1 = QpidConnectionInfo.FromUrl(connectionInfo.ToString());
+
+            Console.WriteLine(connectionInfo.ToString());
+            Console.WriteLine(connectionInfo1.ToString());
+
+            Assert.AreEqual(connectionInfo.Username, connectionInfo1.Username);
+            Assert.AreEqual(connectionInfo.Password, connectionInfo1.Password);
+            Assert.AreEqual(connectionInfo.VirtualHost, connectionInfo1.VirtualHost);
+
+            Assert.IsTrue((connectionInfo1.GetAllBrokerInfos().Count == 2));
+            Assert.IsTrue(connectionInfo.GetBrokerInfo(0).Equals(connectionInfo1.GetBrokerInfo(0)));
+            Assert.IsTrue(connectionInfo.GetBrokerInfo(1).Equals(connectionInfo1.GetBrokerInfo(1)));
+
+        }
+
+        [Test]
+        public void EnsureVirtualHostStartsWithSlash()
+        {
+           IConnectionInfo connection = new QpidConnectionInfo();
+           connection.VirtualHost = "test";
+           Assert.AreEqual("/test", connection.VirtualHost);
+
+           connection.VirtualHost = "/mytest";
+           Assert.AreEqual("/mytest", connection.VirtualHost);
+
+           connection.VirtualHost = "";
+           Assert.AreEqual("/", connection.VirtualHost);
+
+           connection.VirtualHost = null;
+           Assert.AreEqual("/", connection.VirtualHost);
+        }
     }
 }

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs Thu May 17 17:51:12 2007
@@ -260,7 +260,7 @@
             sb.Append(_transport);
             sb.Append("://");
 
-            if (!(_transport.ToLower().Equals("vm")))
+            if (!(StringEqualsIgnoreCase(_transport, "vm")))
             {
                 sb.Append(_host);
             }
@@ -268,8 +268,7 @@
             sb.Append(':');
             sb.Append(_port);
 
-            // XXX
-//            sb.Append(printOptionsURL());
+            sb.Append(URLHelper.printOptions(_options));
 
             return sb.ToString();
         }
@@ -284,7 +283,8 @@
 	        IBrokerInfo bd = (IBrokerInfo) obj;
 	        return StringEqualsIgnoreCase(_host, bd.Host) &&
 	        	_port == bd.Port &&
-                _transport == bd.Transport;
+                StringEqualsIgnoreCase(_transport, bd.Transport) &&
+                UseSSL == bd.UseSSL;
         }
     	
 		public override int GetHashCode()
@@ -297,35 +297,6 @@
         {
             return one.ToLower().Equals(two.ToLower());
         }
-
-//        private string printOptionsURL()
-//        {
-//            stringBuffer optionsURL = new stringBuffer();
-//
-//            optionsURL.Append('?');
-//
-//            if (!(_options.isEmpty()))
-//            {
-//
-//                for (string key : _options.keySet())
-//                {
-//                    optionsURL.Append(key);
-//
-//                    optionsURL.Append("='");
-//
-//                    optionsURL.Append(_options.get(key));
-//
-//                    optionsURL.Append("'");
-//
-//                    optionsURL.Append(URLHelper.DEFAULT_OPTION_SEPERATOR);
-//                }
-//            }
-//
-//            //remove the extra DEFAULT_OPTION_SEPERATOR or the '?' if there are no options
-//            optionsURL.deleteCharAt(optionsURL.length() - 1);
-//
-//            return optionsURL.tostring();
-//        }
 
         public bool UseSSL
         {

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs Thu May 17 17:51:12 2007
@@ -36,7 +36,7 @@
     {
         private static readonly ILog _logger = LogManager.GetLogger(typeof(AmqChannel));
 
-        private const int BASIC_CONTENT_TYPE = 60;
+        internal const int BASIC_CONTENT_TYPE = 60;
 
         private static int _nextSessionNumber = 0;
 
@@ -122,7 +122,7 @@
 
                     if (consumer == null)
                     {
-                        _logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a handler - ignoring...");
+                        _logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a f - ignoring...");
                     }
                     else
                     {
@@ -156,103 +156,72 @@
             }
         }
 
-        internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, int defaultPrefetch) :
-            this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.NewDefaultRegistry(), defaultPrefetch)
-        {
-        }
-
         /// <summary>
         /// Initializes a new instance of the <see cref="AmqChannel"/> class.
         /// </summary>
-        /// <param name="con">The con.</param>
+        /// <param name="con">The connection.</param>
         /// <param name="channelId">The channel id.</param>
         /// <param name="transacted">if set to <c>true</c> [transacted].</param>
         /// <param name="acknowledgeMode">The acknowledge mode.</param>
-        /// <param name="messageFactoryRegistry">The message factory registry.</param>
-        internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode,
-                            MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetch)
+        /// <param name="defaultPrefetch">Default prefetch value</param>
+        internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, int defaultPrefetch)
+           : this()
+        {
+           _sessionNumber = Interlocked.Increment(ref _nextSessionNumber);
+           _connection = con;
+           _transacted = transacted;
+           if ( transacted )
+           {
+              _acknowledgeMode = AcknowledgeMode.SessionTransacted;
+           } else
+           {
+              _acknowledgeMode = acknowledgeMode;
+           }
+           _channelId = channelId;
+        }
+        
+        private AmqChannel()
         {
-            _sessionNumber = Interlocked.Increment(ref _nextSessionNumber);
-            _connection = con;
-            _transacted = transacted;
-            if (transacted)
-            {
-                _acknowledgeMode = AcknowledgeMode.SessionTransacted;
-            }
-            else
-            {
-                _acknowledgeMode = acknowledgeMode;
-            }
-            _channelId = channelId;
-            _messageFactoryRegistry = messageFactoryRegistry;
+           _messageFactoryRegistry = MessageFactoryRegistry.NewDefaultRegistry();
+        }
+
+        /// <summary>
+        /// Create a disconnected channel that will fault
+        /// for most things, but is useful for testing
+        /// </summary>
+        /// <returns>A new disconnected channel</returns>
+        public static IChannel CreateDisconnectedChannel()
+        {
+           return new AmqChannel();
         }
 
+
         public IBytesMessage CreateBytesMessage()
         {
-            lock (_connection.FailoverMutex)
-            {
-                CheckNotClosed();
-                try
-                {
-                    return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream");
-                }
-                catch (AMQException e)
-                {
-                    throw new QpidException("Unable to create message: " + e);
-                }
-            }
+            return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream");
         }
 
         public IMessage CreateMessage()
         {
-            lock (_connection.FailoverMutex)
-            {
-                CheckNotClosed();
-                try
-                {
-                    // TODO: this is supposed to create a message consisting only of message headers
-                    return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream");
-                }
-                catch (AMQException e)
-                {
-                    throw new QpidException("Unable to create message: " + e);
-                }
-            }
+            // TODO: this is supposed to create a message consisting only of message headers
+            return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream");
+        }
+        
+        public IMessage CreateMessage(string mimeType)
+        {
+           return _messageFactoryRegistry.CreateMessage(mimeType);
         }
 
         public ITextMessage CreateTextMessage()
         {
-            lock (_connection.FailoverMutex)
-            {
-                CheckNotClosed();
-
-                try
-                {
-                    return (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain");
-                }
-                catch (AMQException e)
-                {
-                    throw new QpidException("Unable to create message: " + e);
-                }
-            }
+            return CreateTextMessage(String.Empty);
         }
 
         public ITextMessage CreateTextMessage(string text)
         {
-            lock (_connection.FailoverMutex)
-            {
-                CheckNotClosed();
-                try
-                {
-                    ITextMessage msg = (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain");
-                    msg.Text = text;
-                    return msg;
-                }
-                catch (AMQException e)
-                {
-                    throw new QpidException("Unable to create message: " + e);
-                }
-            }
+            ITextMessage msg = (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain");
+            msg.Text = text;
+            return msg;
         }
 
         public bool Transacted
@@ -538,11 +507,6 @@
             }
         }
 
-        public IFieldTable CreateFieldTable()
-        {
-            return new FieldTable();
-        }
-
         public void Unsubscribe(String name)
         {
             throw new NotImplementedException(); // FIXME
@@ -709,6 +673,30 @@
             // at this point the _consumers map will be empty
         }
 
+        public void PurgeQueue(string queueName, bool noWait)
+        {
+            DoPurgeQueue(queueName, noWait);
+        }
+
+        private void DoPurgeQueue(string queueName, bool noWait)
+        {
+            try
+            {
+                _logger.DebugFormat("PurgeQueue {0}", queueName);
+
+                AMQFrame purgeQueue = QueuePurgeBody.CreateAMQFrame(_channelId, 0, queueName, noWait);
+
+                if (noWait)
+                    _connection.ProtocolWriter.Write(purgeQueue);
+                else
+                    _connection.ConvenientProtocolWriter.SyncWrite(purgeQueue, typeof(QueuePurgeOkBody));
+            }
+            catch (AMQException)
+            {
+                throw;
+            }
+        }
+
         /**
          * Replays frame on fail over.
          * 
@@ -784,132 +772,44 @@
             throw new NotImplementedException(); // FIXME
         }
 
-        public void DeleteQueue()
-        {
-            throw new NotImplementedException(); // FIXME
-        }
-
-        public MessageConsumerBuilder CreateConsumerBuilder(string queueName)
-        {
-            return new MessageConsumerBuilder(this, queueName);
-        }
-
-        public MessagePublisherBuilder CreatePublisherBuilder()
-        {
-            return new MessagePublisherBuilder(this);
-        }
-
-        internal void BasicPublish(string exchangeName, string routingKey, bool mandatory, bool immediate,
-                                   AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive,
-                                   bool disableTimestamps)
+        public void DeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait)
         {
-            DoBasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, timeToLive, priority, disableTimestamps);
+            DoDeleteQueue(queueName, ifUnused, ifEmpty, noWait);
         }
 
-        private void DoBasicPublish(string exchangeName, string routingKey, bool mandatory, bool immediate, AbstractQmsMessage message, DeliveryMode deliveryMode, uint timeToLive, int priority, bool disableTimestamps)
+        private void DoDeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait)
         {
-            AMQFrame publishFrame = BasicPublishBody.CreateAMQFrame(_channelId, 0, exchangeName,
-                                                                    routingKey, mandatory, immediate);
-
-            long currentTime = 0;
-            if (!disableTimestamps)
-            {
-                currentTime = DateTime.UtcNow.Ticks;
-                message.Timestamp = currentTime;
-            }
-
-            ByteBuffer buf = message.Data;
-            byte[] payload = null;
-            if (buf != null)
+            try
             {
-                payload = new byte[buf.Remaining];
-                buf.GetBytes(payload);
-            }
-            BasicContentHeaderProperties contentHeaderProperties = message.ContentHeaderProperties;
+                _logger.Debug(string.Format("DeleteQueue name={0}", queueName));
+                
+                AMQFrame queueDelete = QueueDeleteBody.CreateAMQFrame(_channelId, 0,
+                    queueName, // queueName
+                    ifUnused, // IfUnUsed
+                    ifEmpty, // IfEmpty
+                    noWait); // NoWait
 
-            if (timeToLive > 0)
-            {
-                if (!disableTimestamps)
-                {
-                    contentHeaderProperties.Expiration = currentTime + timeToLive;
-                }
-            }
-            else
-            {
-                contentHeaderProperties.Expiration = 0;
-            }
-            contentHeaderProperties.SetDeliveryMode(deliveryMode);
-            contentHeaderProperties.Priority = (byte)priority;
+                _replayFrames.Add(queueDelete);
 
-            ContentBody[] contentBodies = CreateContentBodies(payload);
-            AMQFrame[] frames = new AMQFrame[2 + contentBodies.Length];
-            for (int i = 0; i < contentBodies.Length; i++)
-            {
-                frames[2 + i] = ContentBody.CreateAMQFrame(_channelId, contentBodies[i]);
-            }
-            if (contentBodies.Length > 0 && _logger.IsDebugEnabled)
-            {
-                _logger.Debug(string.Format("Sending content body frames to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
+                if (noWait)
+                    _connection.ProtocolWriter.Write(queueDelete);
+                else
+                    _connection.ConvenientProtocolWriter.SyncWrite(queueDelete, typeof(QueueDeleteOkBody));
             }
-
-            // weight argument of zero indicates no child content headers, just bodies
-            AMQFrame contentHeaderFrame = ContentHeaderBody.CreateAMQFrame(_channelId, BASIC_CONTENT_TYPE, 0, contentHeaderProperties,
-                                                                           (uint)payload.Length);
-            if (_logger.IsDebugEnabled)
+            catch (AMQException)
             {
-                _logger.Debug(string.Format("Sending content header frame to  {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
+                throw;
             }
-
-            frames[0] = publishFrame;
-            frames[1] = contentHeaderFrame;
-            CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
-
-            lock (_connection.FailoverMutex) {
-                _connection.ProtocolWriter.Write(compositeFrame);
-            }   
         }
 
-        /// <summary>
-        /// Create content bodies. This will split a large message into numerous bodies depending on the negotiated
-        /// maximum frame size.
-        /// </summary>
-        /// <param name="payload"></param>
-        /// <returns>return the array of content bodies</returns>
-        private ContentBody[] CreateContentBodies(byte[] payload)
+        public MessageConsumerBuilder CreateConsumerBuilder(string queueName)
         {
-            if (payload == null)
-            {
-                return null;
-            }
-            else if (payload.Length == 0)
-            {
-                return new ContentBody[0];
-            }
-            // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
-            // (0xCE byte).
-            long framePayloadMax = Connection.MaximumFrameSize - 1;
-            int lastFrame = (payload.Length % framePayloadMax) > 0 ? 1 : 0;
-            int frameCount = (int)(payload.Length / framePayloadMax) + lastFrame;
-            ContentBody[] bodies = new ContentBody[frameCount];
+            return new MessageConsumerBuilder(this, queueName);
+        }
 
-            if (frameCount == 1)
-            {
-                bodies[0] = new ContentBody();
-                bodies[0].Payload = payload;
-            }
-            else
-            {
-                long remaining = payload.Length;
-                for (int i = 0; i < bodies.Length; i++)
-                {
-                    bodies[i] = new ContentBody();
-                    byte[] framePayload = new byte[(remaining >= framePayloadMax) ? (int)framePayloadMax : (int)remaining];
-                    Array.Copy(payload, (int)framePayloadMax * i, framePayload, 0, framePayload.Length);
-                    bodies[i].Payload = framePayload;
-                    remaining -= framePayload.Length;
-                }
-            }
-            return bodies;
+        public MessagePublisherBuilder CreatePublisherBuilder()
+        {
+            return new MessagePublisherBuilder(this);
         }
 
         public string GenerateUniqueName()

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs Thu May 17 17:51:12 2007
@@ -21,255 +21,359 @@
 using System;
 using System.Threading;
 using log4net;
+using Qpid.Buffer;
 using Qpid.Client.Message;
 using Qpid.Messaging;
+using Qpid.Framing;
 
 namespace Qpid.Client
 {
-    public class BasicMessageProducer : Closeable, IMessagePublisher
-    {
-        protected readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageProducer));
-
-        /// <summary>
-        /// If true, messages will not get a timestamp.
-        /// </summary>
-        private bool _disableTimestamps;
-
-        /// <summary>
-        /// Priority of messages created by this producer.
-        /// </summary>
-        private int _messagePriority;
-
-        /// <summary>
-        /// Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
-        ///
-        private long _timeToLive;
-
-        /// <summary>
-        /// Delivery mode used for this producer.
-        /// </summary>
-        private DeliveryMode _deliveryMode;
-
-        private bool _immediate;
-        private bool _mandatory;
-
-        string _exchangeName;
-        string _routingKey;
-
-        /// <summary>
-        /// Default encoding used for messages produced by this producer.
-        /// </summary>
-        private string _encoding;
-
-        /// <summary>
-        /// Default encoding used for message produced by this producer.
-        /// </summary>
-        private string _mimeType;
-
-        /// <summary>
-        /// True if this producer was created from a transacted session
-        /// </summary>
-        private bool _transacted;
-
-        private ushort _channelId;
-
-        /// <summary>
-        /// This is an id generated by the session and is used to tie individual producers to the session. This means we
-        /// can deregister a producer with the session when the producer is closed. We need to be able to tie producers
-        /// to the session so that when an error is propagated to the session it can close the producer (meaning that
-        /// a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently).
-        /// </summary>
-        private long _producerId;
-
-        /// <summary>
-        /// The session used to create this producer
-        /// </summary>
-        private AmqChannel _channel;
-
-        /// <summary>
-        /// Default value for immediate flag is false, i.e. a consumer does not need to be attached to a queue
-        /// </summary>
-        protected const bool DEFAULT_IMMEDIATE = false;
-
-        /// <summary>
-        /// Default value for mandatory flag is true, i.e. server will not silently drop messages where no queue is
-        /// connected to the exchange for the message
-        /// </summary>
-        protected const bool DEFAULT_MANDATORY = true;
-
-        public BasicMessageProducer(string exchangeName, string routingKey,
-            bool transacted, 
-            ushort channelId,
-            AmqChannel channel, 
-            long producerId,
-            DeliveryMode deliveryMode,
-            long timeToLive, 
-            bool immediate, 
-            bool mandatory, 
-            int priority)
-        {
-            _exchangeName = exchangeName;
-            _routingKey = routingKey;
-            _transacted = transacted;
-            _channelId = channelId;
-            _channel = channel;
-            _producerId = producerId;
-            _deliveryMode = deliveryMode;
-            _timeToLive = timeToLive;
-            _immediate = immediate;
-            _mandatory = mandatory;
-            _messagePriority = priority;
-            
-            _channel.RegisterProducer(producerId, this);
-        }
-
-
-        #region IMessagePublisher Members
-
-        public DeliveryMode DeliveryMode
-        {
-            get
-            {
-                CheckNotClosed();
-                return _deliveryMode;
-            }
-            set
-            {
-                CheckNotClosed();
-                _deliveryMode = value;
-            }
-        }
-
-        public string ExchangeName
-        {
-            get { return _exchangeName; }
-        }
-
-        public string RoutingKey
-        {
-            get { return _routingKey; }
-        }
-
-        public bool DisableMessageID
-        {
-            get
-            {
-                throw new Exception("The method or operation is not implemented.");
-            }
-            set
-            {
-                throw new Exception("The method or operation is not implemented.");
-            }
-        }
-
-        public bool DisableMessageTimestamp
-        {
-            get
-            {
-                CheckNotClosed();
-                return _disableTimestamps;
-            }
-            set
-            {
-                CheckNotClosed();
-                _disableTimestamps = value;
-            }
-        }
-
-        public int Priority
-        {
-            get
-            {
-                CheckNotClosed();
-                return _messagePriority;
-            }
-            set
-            {
-                CheckNotClosed();
-                if (value < 0 || value > 9)
-                {
-                    throw new ArgumentOutOfRangeException("Priority of " + value + " is illegal. Value must be in range 0 to 9");
-                }
-                _messagePriority = value;
-            }
-        }
-
-        public override void Close()
-        {
-            _logger.Debug("Closing producer " + this);
-            Interlocked.Exchange(ref _closed, CLOSED);
-            _channel.DeregisterProducer(_producerId);
-        }
-
-        public void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive)
-        {
-            CheckNotClosed();
-            SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, deliveryMode, priority, (uint)timeToLive, DEFAULT_MANDATORY,
-                     DEFAULT_IMMEDIATE);
-        }
-
-        public void Send(IMessage msg)
-        {
-            CheckNotClosed();
-            SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive,
-                     DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
-        }
-
-        // This is a short-term hack (knowing that this code will be re-vamped sometime soon)
-        // to facilitate publishing messages to potentially non-existent recipients.
-        public void Send(IMessage msg, bool mandatory)
-        {
-            CheckNotClosed();
-            SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive,
-                     mandatory, DEFAULT_IMMEDIATE);
-        }
-
-        public long TimeToLive
-        {
-            get
-            {
-                CheckNotClosed();
-                return _timeToLive;
-            }
-            set
+   public class BasicMessageProducer : Closeable, IMessagePublisher
+   {
+      protected readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageProducer));
+
+      /// <summary>
+      /// If true, messages will not get a timestamp.
+      /// </summary>
+      private bool _disableTimestamps;
+
+      /// <summary>
+      /// Priority of messages created by this producer.
+      /// </summary>
+      private int _messagePriority;
+
+      /// <summary>
+      /// Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
+      ///
+      private long _timeToLive;
+
+      /// <summary>
+      /// Delivery mode used for this producer.
+      /// </summary>
+      private DeliveryMode _deliveryMode;
+
+      private bool _immediate;
+      private bool _mandatory;
+
+      string _exchangeName;
+      string _routingKey;
+
+      /// <summary>
+      /// Default encoding used for messages produced by this producer.
+      /// </summary>
+      private string _encoding;
+
+      /// <summary>
+      /// Default encoding used for message produced by this producer.
+      /// </summary>
+      private string _mimeType;
+
+      /// <summary>
+      /// True if this producer was created from a transacted session
+      /// </summary>
+      private bool _transacted;
+
+      private ushort _channelId;
+
+      /// <summary>
+      /// This is an id generated by the session and is used to tie individual producers to the session. This means we
+      /// can deregister a producer with the session when the producer is closed. We need to be able to tie producers
+      /// to the session so that when an error is propagated to the session it can close the producer (meaning that
+      /// a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently).
+      /// </summary>
+      private long _producerId;
+
+      /// <summary>
+      /// The session used to create this producer
+      /// </summary>
+      private AmqChannel _channel;
+
+      /// <summary>
+      /// Default value for immediate flag is false, i.e. a consumer does not need to be attached to a queue
+      /// </summary>
+      protected const bool DEFAULT_IMMEDIATE = false;
+
+      /// <summary>
+      /// Default value for mandatory flag is true, i.e. server will not silently drop messages where no queue is
+      /// connected to the exchange for the message
+      /// </summary>
+      protected const bool DEFAULT_MANDATORY = true;
+
+      public BasicMessageProducer(string exchangeName, string routingKey,
+          bool transacted,
+          ushort channelId,
+          AmqChannel channel,
+          long producerId,
+          DeliveryMode deliveryMode,
+          long timeToLive,
+          bool immediate,
+          bool mandatory,
+          int priority)
+      {
+         _exchangeName = exchangeName;
+         _routingKey = routingKey;
+         _transacted = transacted;
+         _channelId = channelId;
+         _channel = channel;
+         _producerId = producerId;
+         _deliveryMode = deliveryMode;
+         _timeToLive = timeToLive;
+         _immediate = immediate;
+         _mandatory = mandatory;
+         _messagePriority = priority;
+
+         _channel.RegisterProducer(producerId, this);
+      }
+
+
+      #region IMessagePublisher Members
+
+      public DeliveryMode DeliveryMode
+      {
+         get
+         {
+            CheckNotClosed();
+            return _deliveryMode;
+         }
+         set
+         {
+            CheckNotClosed();
+            _deliveryMode = value;
+         }
+      }
+
+      public string ExchangeName
+      {
+         get { return _exchangeName; }
+      }
+
+      public string RoutingKey
+      {
+         get { return _routingKey; }
+      }
+
+      public bool DisableMessageID
+      {
+         get
+         {
+            throw new Exception("The method or operation is not implemented.");
+         }
+         set
+         {
+            throw new Exception("The method or operation is not implemented.");
+         }
+      }
+
+      public bool DisableMessageTimestamp
+      {
+         get
+         {
+            CheckNotClosed();
+            return _disableTimestamps;
+         }
+         set
+         {
+            CheckNotClosed();
+            _disableTimestamps = value;
+         }
+      }
+
+      public int Priority
+      {
+         get
+         {
+            CheckNotClosed();
+            return _messagePriority;
+         }
+         set
+         {
+            CheckNotClosed();
+            if ( value < 0 || value > 9 )
             {
-                CheckNotClosed();
-                if (value < 0)
-                {
-                    throw new ArgumentOutOfRangeException("Time to live must be non-negative - supplied value was " + value);
-                }
-                _timeToLive = value;
+               throw new ArgumentOutOfRangeException("Priority of " + value + " is illegal. Value must be in range 0 to 9");
             }
-        }
-
-        #endregion
-
-        private void SendImpl(string exchangeName, string routingKey, AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool mandatory, bool immediate)
-        {
-            _channel.BasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, priority, timeToLive, _disableTimestamps);
-        }
-
-        public string MimeType
-        {
-            set
+            _messagePriority = value;
+         }
+      }
+
+      public override void Close()
+      {
+         _logger.Debug("Closing producer " + this);
+         Interlocked.Exchange(ref _closed, CLOSED);
+         _channel.DeregisterProducer(_producerId);
+      }
+
+      public void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive)
+      {
+         CheckNotClosed();
+         SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, deliveryMode, priority, (uint)timeToLive, DEFAULT_MANDATORY,
+                  DEFAULT_IMMEDIATE);
+      }
+
+      public void Send(IMessage msg)
+      {
+         CheckNotClosed();
+         SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive,
+                  DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
+      }
+
+      // This is a short-term hack (knowing that this code will be re-vamped sometime soon)
+      // to facilitate publishing messages to potentially non-existent recipients.
+      public void Send(IMessage msg, bool mandatory)
+      {
+         CheckNotClosed();
+         SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive,
+                  mandatory, DEFAULT_IMMEDIATE);
+      }
+
+      public long TimeToLive
+      {
+         get
+         {
+            CheckNotClosed();
+            return _timeToLive;
+         }
+         set
+         {
+            CheckNotClosed();
+            if ( value < 0 )
             {
-                CheckNotClosed();
-                _mimeType = value;
+               throw new ArgumentOutOfRangeException("Time to live must be non-negative - supplied value was " + value);
             }
-        }
+            _timeToLive = value;
+         }
+      }
 
-        public string Encoding
-        {
-            set
-            {
-                CheckNotClosed();
-                _encoding = value;
-            }
-        }
+      #endregion
 
-        public void Dispose()
-        {
-            Close();
-        }
-    }   
+      public string MimeType
+      {
+         set
+         {
+            CheckNotClosed();
+            _mimeType = value;
+         }
+      }
+
+      public string Encoding
+      {
+         set
+         {
+            CheckNotClosed();
+            _encoding = value;
+         }
+      }
+
+      public void Dispose()
+      {
+         Close();
+      }
+
+      #region Message Publishing
+
+      private void SendImpl(string exchangeName, string routingKey, AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool mandatory, bool immediate)
+      {
+         // todo: handle session access ticket
+         AMQFrame publishFrame = BasicPublishBody.CreateAMQFrame(
+            _channel.ChannelId, 0, exchangeName,
+            routingKey, mandatory, immediate
+            );
+
+         // fix message properties
+         if ( !_disableTimestamps )
+         {
+            message.Timestamp = DateTime.UtcNow.Ticks;
+            message.Expiration = message.Timestamp + timeToLive;
+         } else
+         {
+            message.Expiration = 0;
+         }
+         message.DeliveryMode = deliveryMode;
+         message.Priority = (byte)priority;
+
+         ByteBuffer payload = message.Data;
+         int payloadLength = payload.Limit;
+
+         ContentBody[] contentBodies = CreateContentBodies(payload);
+         AMQFrame[] frames = new AMQFrame[2 + contentBodies.Length];
+         for ( int i = 0; i < contentBodies.Length; i++ )
+         {
+            frames[2 + i] = ContentBody.CreateAMQFrame(_channelId, contentBodies[i]);
+         }
+         if ( contentBodies.Length > 0 && _logger.IsDebugEnabled )
+         {
+            _logger.Debug(string.Format("Sending content body frames to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
+         }
+
+         // weight argument of zero indicates no child content headers, just bodies
+         AMQFrame contentHeaderFrame = ContentHeaderBody.CreateAMQFrame(
+            _channelId, AmqChannel.BASIC_CONTENT_TYPE, 0, 
+            message.ContentHeaderProperties, (uint)payloadLength
+            );
+         if ( _logger.IsDebugEnabled )
+         {
+            _logger.Debug(string.Format("Sending content header frame to  {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
+         }
+
+         frames[0] = publishFrame;
+         frames[1] = contentHeaderFrame;
+         CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
+
+         lock ( _channel.Connection.FailoverMutex )
+         {
+            _channel.Connection.ProtocolWriter.Write(compositeFrame);
+         }
+      }
+
+
+      /// <summary>
+      /// Create content bodies. This will split a large message into numerous bodies depending on the negotiated
+      /// maximum frame size.
+      /// </summary>
+      /// <param name="payload"></param>
+      /// <returns>return the array of content bodies</returns>
+      private ContentBody[] CreateContentBodies(ByteBuffer payload)
+      {
+         if ( payload == null )
+         {
+            return null;
+         } else if ( payload.Remaining == 0 )
+         {
+            return new ContentBody[0];
+         }
+         // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
+         // (0xCE byte).
+         int framePayloadMax = (int)(_channel.Connection.MaximumFrameSize - 1);
+         int frameCount = CalculateContentBodyFrames(payload);
+         ContentBody[] bodies = new ContentBody[frameCount];
+         for ( int i = 0; i < frameCount; i++ )
+         {
+            int length = (payload.Remaining >= framePayloadMax)
+               ? framePayloadMax : payload.Remaining;
+            bodies[i] = new ContentBody(payload, (uint)length);
+         }
+         return bodies;
+      }
+
+      private int CalculateContentBodyFrames(ByteBuffer payload)
+      {
+         // we substract one from the total frame maximum size to account 
+         // for the end of frame marker in a body frame
+         // (0xCE byte).
+         int frameCount;
+         if ( (payload == null) || (payload.Remaining == 0) )
+         {
+            frameCount = 0;
+         } else
+         {
+            int dataLength = payload.Remaining;
+            int framePayloadMax = (int)_channel.Connection.MaximumFrameSize - 1;
+            int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
+            frameCount = (int)(dataLength / framePayloadMax) + lastFrame;
+         }
+
+         return frameCount;
+      }
+      #endregion // Message Publishing
+   }
 }

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs Thu May 17 17:51:12 2007
@@ -45,14 +45,12 @@
             _logger.Debug(String.Format("ConnectionTune.heartbeat = {0}.", frame.Heartbeat));
 
             parameters.FrameMax = frame.FrameMax;
-            parameters.FrameMax = 65535;
-            //params.setChannelMax(frame.channelMax);
             parameters.Heartbeat = frame.Heartbeat;
             session.ConnectionTuneParameters = parameters;
 
             stateManager.ChangeState(AMQState.CONNECTION_NOT_OPENED);
             session.WriteFrame(ConnectionTuneOkBody.CreateAMQFrame(
-                                   evt.ChannelId, frame.ChannelMax, 65535, frame.Heartbeat));
+                                   evt.ChannelId, frame.ChannelMax, frame.FrameMax, frame.Heartbeat));
             session.WriteFrame(ConnectionOpenBody.CreateAMQFrame(
                                    evt.ChannelId, session.AMQConnection.VirtualHost, null, true));
 

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs Thu May 17 17:51:12 2007
@@ -27,7 +27,7 @@
 {
     public abstract class AbstractQmsMessageFactory : IMessageFactory
     {
-        public abstract AbstractQmsMessage CreateMessage();
+        public abstract AbstractQmsMessage CreateMessage(string mimeType);
 
         private static readonly ILog _logger = LogManager.GetLogger(typeof (AbstractQmsMessageFactory));
 
@@ -43,7 +43,7 @@
             if (bodies != null && bodies.Count == 1)
             {
                 _logger.Debug("Non-fragmented message body (bodySize=" + contentHeader.BodySize +")");
-                data = ByteBuffer.Wrap(((ContentBody)bodies[0]).Payload);
+                data = ((ContentBody)bodies[0]).Payload;
             }
             else
             {

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs Thu May 17 17:51:12 2007
@@ -72,24 +72,29 @@
 
 #endregion
 
+        #region Properties
+        //
+        // Properties
+        //
+
+        /// <summary>
+        /// The application message identifier
+        /// </summary>
         public string MessageId
         {
-            get
-            {
+            get {
                 if (ContentHeaderProperties.MessageId == null)
                 {
                     ContentHeaderProperties.MessageId = "ID:" + DeliveryTag;
                 }
                 return ContentHeaderProperties.MessageId;
             }
-            set
-            {
-                ContentHeaderProperties.MessageId = value;
-            }
-
-
-        }        
+            set { ContentHeaderProperties.MessageId = value; }
+        }
 
+        /// <summary>
+        /// The message timestamp
+        /// </summary>
         public long Timestamp
         {
             get
@@ -103,36 +108,22 @@
             }
         }        
 
-        protected void CheckReadable()
-        {
-            if (!_readableMessage)
-            {
-                throw new MessageNotReadableException("You need to call reset() to make the message readable");
-            }
-        }
-
+        /// <summary>
+        /// The <see cref="CorrelationId"/> as a byte array.
+        /// </summary>
         public byte[] CorrelationIdAsBytes
         {
-            get
-            {            
-                return Encoding.Default.GetBytes(ContentHeaderProperties.CorrelationId);
-            }
-            set
-            {
-                ContentHeaderProperties.CorrelationId = Encoding.Default.GetString(value);
-            }
+            get { return Encoding.Default.GetBytes(ContentHeaderProperties.CorrelationId); }
+            set { ContentHeaderProperties.CorrelationId = Encoding.Default.GetString(value); }
         }
 
+        /// <summary>
+        /// The application correlation identifier
+        /// </summary>
         public string CorrelationId
         {
-            get
-            {
-                return ContentHeaderProperties.CorrelationId;
-            }
-            set
-            {
-                ContentHeaderProperties.ContentType = value;
-            }
+            get { return ContentHeaderProperties.CorrelationId; }
+            set { ContentHeaderProperties.CorrelationId = value; }
         }
         
         struct Dest
@@ -147,6 +138,9 @@
             }
         }
 
+        /// <summary>
+        /// Exchange name of the reply-to address
+        /// </summary>
         public string ReplyToExchangeName
         {
             get
@@ -162,6 +156,9 @@
             }
         }
 
+        /// <summary>
+        /// Routing key of the reply-to address
+        /// </summary>
         public string ReplyToRoutingKey
         {
             get
@@ -177,50 +174,11 @@
             }
         }
 
-        /// <summary>
-        /// Decodes the replyto field if one is set.
-        /// 
-        /// Splits a replyto field containing an exchange name followed by a ':', followed by a routing key into the exchange name and
-        /// routing key seperately. The exchange name may be empty in which case the empty string is returned. If the exchange name is
-        /// empty the replyto field is expected to being with ':'.
-        /// 
-        /// Anyhting other than a two part replyto field sperated with a ':' will result in an exception.
-        /// </summary>
-        /// 
-        /// <returns>A destination initialized to the replyto location if a replyto field was set, or an empty destination otherwise.</returns>
-        private Dest ReadReplyToHeader()
-        {
-            string replyToEncoding = ContentHeaderProperties.ReplyTo;
-
-            if (replyToEncoding == null)
-            {
-                return new Dest();
-            }
-            else
-            {
-                // Split the replyto field on a ':'
-                string[] split = replyToEncoding.Split(':');
-
-                // Ensure that the replyto field argument only consisted of two parts.
-                if (split.Length != 2)
-                {
-                    throw new QpidException("Illegal value in ReplyTo property: " + replyToEncoding);
-                }
 
-                // Extract the exchange name and routing key from the split replyto field.
-                string exchangeName = split[0];
-                string routingKey = split[1];
-
-                return new Dest(exchangeName, routingKey);                
-            }            
-        }
-        
-        private void WriteReplyToHeader(Dest dest)
-        {
-            string encodedDestination = string.Format("{0}:{1}", dest.ExchangeName, dest.RoutingKey);
-            ContentHeaderProperties.ReplyTo = encodedDestination;            
-        }
 
+        /// <summary>
+        /// Non-persistent (1) or persistent (2)
+        /// </summary>
         public DeliveryMode DeliveryMode
         {
             get
@@ -242,100 +200,94 @@
             }
         }        
 
+       /// <summary>
+       /// True, if this is a redelivered message
+       /// </summary>
         public bool Redelivered
         {
-            get
-            {
-                return _redelivered;
-            }
-            set
-            {
-                _redelivered = value;
-            }
-        }        
+            get { return _redelivered; }
+            set { _redelivered = value; }
+        }
 
+        /// <summary>
+        /// The message type name
+        /// </summary>
         public string Type
         {
-            get
-            {
-                return MimeType;
-            }            
-            set
-            {
-                //MimeType = value;
-            }
+            get { return ContentHeaderProperties.Type; }
+            set { ContentHeaderProperties.Type = value; }
         }
-        
+
+        /// <summary>
+        /// Message expiration specification
+        /// </summary>
         public long Expiration
         {
-            get
-            {
-                return ContentHeaderProperties.Expiration;
-            }
-            set
-            {
-                ContentHeaderProperties.Expiration = value;
-            }
+            get { return ContentHeaderProperties.Expiration; }
+            set { ContentHeaderProperties.Expiration = value; }
         }
 
-        public int Priority
+        /// <summary>
+        /// The message priority, 0 to 9
+        /// </summary>
+        public byte Priority
         {
-            get
-            {
-                return ContentHeaderProperties.Priority;
-            }
-            set
-            {
-                ContentHeaderProperties.Priority = (byte) value;
-            }
+            get { return ContentHeaderProperties.Priority; }
+            set { ContentHeaderProperties.Priority = (byte) value; }
         }
 
-        // FIXME: implement
+        /// <summary>
+        /// The MIME Content Type
+        /// </summary>
         public string ContentType
         {
-            get { throw new NotImplementedException(); }
-            set { throw new NotImplementedException(); }
+            get { return ContentHeaderProperties.ContentType; }
+            set { ContentHeaderProperties.ContentType = value; }
         }
 
-        // FIXME: implement
+        /// <summary>
+        /// The MIME Content Encoding
+        /// </summary>
         public string ContentEncoding
         {
-            get { throw new NotImplementedException(); }
-            set { throw new NotImplementedException(); }
-        }
-
-        public void Acknowledge()
-        {
-            // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
-            // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
-            if (_channel != null)
-            {
-                // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
-                // received on the session
-                _channel.AcknowledgeMessage((ulong)DeliveryTag, true);
-            }
-
+            get { return ContentHeaderProperties.Encoding; }
+            set { ContentHeaderProperties.Encoding = value; }
         }
 
+        /// <summary>
+        /// Headers of this message
+        /// </summary>
         public IHeaders Headers
         {
             get { return _headers; }
         }
 
-        public abstract void ClearBodyImpl();
+        /// <summary>
+        /// The creating user id
+        /// </summary>
+        public string UserId
+        {
+            get { return ContentHeaderProperties.UserId; }
+            set { ContentHeaderProperties.UserId = value; }
+        }
 
-        public void ClearBody()
+        /// <summary>
+        /// The creating application id
+        /// </summary>
+        public string AppId
         {
-            ClearBodyImpl();
-            _readableMessage = false;
+            get { return ContentHeaderProperties.AppId; }
+            set { ContentHeaderProperties.AppId = value; }
         }
 
         /// <summary>
-        /// Get a String representation of the body of the message. Used in the
-        /// toString() method which outputs this before message properties.
+        /// Intra-cluster routing identifier
         /// </summary>
-        /// <exception cref="QpidException"></exception>
-        public abstract string ToBodyString();
+        public string ClusterId
+        {
+            get { return ContentHeaderProperties.ClusterId; }
+            set { ContentHeaderProperties.ClusterId = value; }
+        }
 
         /// <summary>
         /// Return the raw byte array that is used to populate the frame when sending
@@ -367,12 +319,37 @@
                 _data = value;
             }
         }
+        #endregion // Properties
+
+
+        public void Acknowledge()
+        {
+            // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
+            // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
+            if (_channel != null)
+            {
+                // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
+                // received on the session
+                _channel.AcknowledgeMessage((ulong)DeliveryTag, true);
+            }
+
+        }
 
-        public abstract string MimeType
+        public abstract void ClearBodyImpl();
+
+        public void ClearBody()
         {
-            get;           
+            ClearBodyImpl();
+            _readableMessage = false;
         }
 
+        /// <summary>
+        /// Get a String representation of the body of the message. Used in the
+        /// toString() method which outputs this before message properties.
+        /// </summary>
+        /// <exception cref="QpidException"></exception>
+        public abstract string ToBodyString();
+
         public override string ToString()
         {
             try
@@ -403,18 +380,6 @@
             }
         }
 
-        public IFieldTable UnderlyingMessagePropertiesMap
-        {
-            get
-            {
-                return ContentHeaderProperties.Headers;
-            }
-            set
-            {
-                ContentHeaderProperties.Headers = (FieldTable)value;
-            }
-        }
-        
         public FieldTable PopulateHeadersFromMessageProperties()
         {
             if (ContentHeaderProperties.Headers == null)
@@ -465,6 +430,57 @@
         public bool isWritable
         {
             get { return !_readableMessage; }
+        }
+
+        protected void CheckReadable()
+        {
+           if ( !_readableMessage )
+           {
+              throw new MessageNotReadableException("You need to call reset() to make the message readable");
+           }
+        }
+
+        /// <summary>
+        /// Decodes the replyto field if one is set.
+        /// 
+        /// Splits a replyto field containing an exchange name followed by a ':', followed by a routing key into the exchange name and
+        /// routing key seperately. The exchange name may be empty in which case the empty string is returned. If the exchange name is
+        /// empty the replyto field is expected to being with ':'.
+        /// 
+        /// Anyhting other than a two part replyto field sperated with a ':' will result in an exception.
+        /// </summary>
+        /// 
+        /// <returns>A destination initialized to the replyto location if a replyto field was set, or an empty destination otherwise.</returns>
+        private Dest ReadReplyToHeader()
+        {
+           string replyToEncoding = ContentHeaderProperties.ReplyTo;
+
+           if ( replyToEncoding == null )
+           {
+              return new Dest();
+           } else
+           {
+              // Split the replyto field on a ':'
+              string[] split = replyToEncoding.Split(':');
+
+              // Ensure that the replyto field argument only consisted of two parts.
+              if ( split.Length != 2 )
+              {
+                 throw new QpidException("Illegal value in ReplyTo property: " + replyToEncoding);
+              }
+
+              // Extract the exchange name and routing key from the split replyto field.
+              string exchangeName = split[0];
+              string routingKey = split[1];
+
+              return new Dest(exchangeName, routingKey);
+           }
+        }
+
+        private void WriteReplyToHeader(Dest dest)
+        {
+           string encodedDestination = string.Format("{0}:{1}", dest.ExchangeName, dest.RoutingKey);
+           ContentHeaderProperties.ReplyTo = encodedDestination;
         }
     }
 }

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs Thu May 17 17:51:12 2007
@@ -28,11 +28,12 @@
         /// <summary>
         /// Create a message
         /// </summary>
-        /// <param name="messageNbr"></param>
-        /// <param name="redelivered"></param>
-        /// <param name="contentHeader"></param>
-        /// <param name="bodies"></param>
-        /// <returns></returns>
+        /// <param name="deliverTag">Delivery Tag</param>
+        /// <param name="messageNbr">Message Sequence Number</param>
+        /// <param name="redelivered">True if this is a redelivered message</param>
+        /// <param name="contentHeader">Content headers</param>
+        /// <param name="bodies">Message bodies</param>
+        /// <returns>The new message</returns>
         /// <exception cref="QpidMessagingException">if the message cannot be created</exception>
         AbstractQmsMessage CreateMessage(long deliverTag, bool redelivered,
                                          ContentHeaderBody contentHeader,
@@ -41,9 +42,10 @@
         /// <summary>
         /// Creates the message.
         /// </summary>
-        /// <returns></returns>
+        /// <param name="mimeType">Mime type to associate the new message with</param>
+        /// <returns>The new message</returns>
         /// <exception cref="QpidMessagingException">if the message cannot be created</exception>
-        AbstractQmsMessage CreateMessage();
+        AbstractQmsMessage CreateMessage(string mimeType);
     }
 }