You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by to...@apache.org on 2007/05/26 19:35:52 UTC

svn commit: r541920 - in /incubator/qpid/branches/M2/dotnet: Qpid.Client.Tests/Common/ Qpid.Client.Tests/SimpleConsumer/ Qpid.Client/ Qpid.Client/Client/ Qpid.Client/Client/Message/ Qpid.Client/Client/Util/ Qpid.Messaging/

Author: tomasr
Date: Sat May 26 10:35:51 2007
New Revision: 541920

URL: http://svn.apache.org/viewvc?view=rev&rev=541920
Log:
QPID-136 Initial Prefetch Implementation

Added:
    incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Util/
    incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs
Modified:
    incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Common/BaseMessagingTestFixture.cs
    incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs
    incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AMQConnection.cs
    incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs
    incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
    incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj
    incubator/qpid/branches/M2/dotnet/Qpid.Client/default.build
    incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IChannel.cs
    incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IConnection.cs
    incubator/qpid/branches/M2/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs

Modified: incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Common/BaseMessagingTestFixture.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Common/BaseMessagingTestFixture.cs?view=diff&rev=541920&r1=541919&r2=541920
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Common/BaseMessagingTestFixture.cs (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Common/BaseMessagingTestFixture.cs Sat May 26 10:35:51 2007
@@ -56,7 +56,7 @@
             {
                 IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(connectionUri);               
                 _connection = new AMQConnection(connectionInfo);
-                _channel = _connection.CreateChannel(false, AcknowledgeMode.NoAcknowledge, 1);
+                _channel = _connection.CreateChannel(false, AcknowledgeMode.NoAcknowledge, 500, 300);
             }
             catch (QpidException e)
             {

Modified: incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs?view=diff&rev=541920&r1=541919&r2=541920
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs Sat May 26 10:35:51 2007
@@ -96,7 +96,7 @@
             _publisher.Send(msg);
          }
 
-         _logger.Error("All messages sent");
+         _logger.Debug("All messages sent");
          // receive all messages
          for ( int i = 0; i < MESSAGE_COUNT; i++ )
          {

Modified: incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AMQConnection.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AMQConnection.cs?view=diff&rev=541920&r1=541919&r2=541920
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AMQConnection.cs (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AMQConnection.cs Sat May 26 10:35:51 2007
@@ -273,15 +273,17 @@
 
             private bool _transacted;
             private AcknowledgeMode _acknowledgeMode;
-            int _prefetch;
+            int _prefetchHigh;
+            int _prefetchLow;
             AMQConnection _connection;
             
-            public CreateChannelFailoverSupport(AMQConnection connection, bool transacted, AcknowledgeMode acknowledgeMode, int prefetch)
+            public CreateChannelFailoverSupport(AMQConnection connection, bool transacted, AcknowledgeMode acknowledgeMode, int prefetchHigh, int prefetchLow)
             {
                 _connection = connection;
                 _transacted = transacted;
                 _acknowledgeMode = acknowledgeMode;
-                _prefetch = prefetch;
+                _prefetchHigh = prefetchHigh;
+                _prefetchLow = prefetchLow;
             }
 
             protected override object operation()
@@ -297,14 +299,14 @@
                 // open it, so that there is no window where we could receive data on the channel and not be set
                 // up to handle it appropriately.
                 AmqChannel channel = new AmqChannel(_connection, 
-                        channelId, _transacted, _acknowledgeMode, _prefetch);
+                        channelId, _transacted, _acknowledgeMode, _prefetchHigh, _prefetchLow);
                 _connection.ProtocolSession.AddSessionByChannel(channelId, channel);
                 _connection.RegisterSession(channelId, channel);
 
                 bool success = false;
                 try
                 {
-                    _connection.createChannelOverWire(channelId, (ushort)_prefetch, _transacted);
+                    _connection.CreateChannelOverWire(channelId, _prefetchHigh, _prefetchLow, _transacted);
                     success = true;
                 }
                 catch (AMQException e)
@@ -334,11 +336,16 @@
         
         public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode)
         {
-            return CreateChannel(transacted, acknowledgeMode, AmqChannel.DEFAULT_PREFETCH);
+            return CreateChannel(transacted, acknowledgeMode, AmqChannel.DEFAULT_PREFETCH_HIGH_MARK);
         }
 
         public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetch)
         {
+           return CreateChannel(transacted, acknowledgeMode, prefetch, prefetch);
+        }
+   
+        public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetchHigh, int prefetchLow)
+        {
             CheckNotClosed();
             if (ChannelLimitReached())
             {
@@ -347,7 +354,7 @@
             else
             {
                 CreateChannelFailoverSupport operation =
-                    new CreateChannelFailoverSupport(this, transacted, acknowledgeMode, prefetch);
+                    new CreateChannelFailoverSupport(this, transacted, acknowledgeMode, prefetchHigh, prefetchLow);
                 return (IChannel)operation.execute(this);
             }
         }
@@ -774,18 +781,23 @@
             foreach (AmqChannel channel in channels)
             {
                 _protocolSession.AddSessionByChannel(channel.ChannelId, channel);
-                ReopenChannel(channel.ChannelId, (ushort)channel.DefaultPrefetch, channel.Transacted);
+                ReopenChannel(
+                   channel.ChannelId, 
+                   channel.DefaultPrefetchHigh, 
+                   channel.DefaultPrefetchLow, 
+                   channel.Transacted
+                   );
                 channel.ReplayOnFailOver();
             }
         }
 
-        private void ReopenChannel(ushort channelId, ushort prefetch, bool transacted)
+        private void ReopenChannel(ushort channelId, int prefetchHigh, int prefetchLow, bool transacted)
         {
-            _log.Debug(string.Format("Reopening channel id={0} prefetch={1} transacted={2}",
-                channelId, prefetch, transacted));
+            _log.Debug(string.Format("Reopening channel id={0} prefetchHigh={1} prefetchLow={2} transacted={3}",
+                channelId, prefetchHigh, prefetchLow, transacted));
             try
             {
-                createChannelOverWire(channelId, prefetch, transacted);
+                CreateChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
             }
             catch (AMQException e)
             {
@@ -795,7 +807,7 @@
             }
         }
 
-        void createChannelOverWire(ushort channelId, ushort prefetch, bool transacted)
+        void CreateChannelOverWire(ushort channelId, int prefetchHigh, int prefetchLow, bool transacted)
         {
             _protocolWriter.SyncWrite(ChannelOpenBody.CreateAMQFrame(channelId, null), typeof (ChannelOpenOkBody));
             
@@ -805,7 +817,8 @@
             {
                 // Basic.Qos frame appears to not be supported by OpenAMQ 1.0d.
                 _protocolWriter.SyncWrite(
-                    BasicQosBody.CreateAMQFrame(channelId, 0, prefetch, false),
+                    BasicQosBody.CreateAMQFrame(
+                    channelId, (uint)prefetchHigh, 0, false),
                     typeof (BasicQosOkBody));                
             }
             

Modified: incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs?view=diff&rev=541920&r1=541919&r2=541920
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs Sat May 26 10:35:51 2007
@@ -25,6 +25,7 @@
 using log4net;
 using Qpid.Buffer;
 using Qpid.Client.Message;
+using Qpid.Client.Util;
 using Qpid.Collections;
 using Qpid.Framing;
 using Qpid.Messaging;
@@ -41,11 +42,14 @@
         private static int _nextSessionNumber = 0;
 
         private int _sessionNumber;
+        private bool _suspended;
+        private object _suspensionLock = new object();
         
         // Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature.
         private int _nextConsumerNumber = 1;
 
-        internal const int DEFAULT_PREFETCH = MessageConsumerBuilder.DEFAULT_PREFETCH_HIGH;
+        public const int DEFAULT_PREFETCH_HIGH_MARK = 5000;
+        public const int DEFAULT_PREFETCH_LOW_MARK = 2500;
 
         private AMQConnection _connection;
 
@@ -55,9 +59,10 @@
 
         private ushort _channelId;
 
-        private int _defaultPrefetch = DEFAULT_PREFETCH;
+        private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK;
+        private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
 
-        private BlockingQueue _queue = new LinkedBlockingQueue();
+        private FlowControlQueue _queue;
 
         private Dispatcher _dispatcher;
 
@@ -105,7 +110,7 @@
             {
                 UnprocessedMessage message;
 
-                while (_stopped == 0 && (message = (UnprocessedMessage)_containingChannel._queue.DequeueBlocking()) != null)
+                while (_stopped == 0 && (message = (UnprocessedMessage)_containingChannel._queue.Dequeue()) != null)
                 {
                     //_queue.size()
                     DispatchMessage(message);
@@ -163,8 +168,9 @@
         /// <param name="channelId">The channel id.</param>
         /// <param name="transacted">if set to <c>true</c> [transacted].</param>
         /// <param name="acknowledgeMode">The acknowledge mode.</param>
-        /// <param name="defaultPrefetch">Default prefetch value</param>
-        internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, int defaultPrefetch)
+        /// <param name="defaultPrefetchHigh">Default prefetch high value</param>
+        /// <param name="defaultPrefetchLow">Default prefetch low value</param>
+        internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
            : this()
         {
            _sessionNumber = Interlocked.Increment(ref _nextSessionNumber);
@@ -178,8 +184,26 @@
               _acknowledgeMode = acknowledgeMode;
            }
            _channelId = channelId;
+           _defaultPrefetchHighMark = defaultPrefetchHigh;
+           _defaultPrefetchLowMark = defaultPrefetchLow;
+
+           if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge )
+           {
+              _queue = new FlowControlQueue(
+                 _defaultPrefetchLowMark, _defaultPrefetchHighMark,
+                 new ThresholdMethod(OnPrefetchLowMark),
+                 new ThresholdMethod(OnPrefetchHighMark)
+              );
+           } else
+           {
+              // low and upper are the same
+              _queue = new FlowControlQueue(
+                 _defaultPrefetchHighMark, _defaultPrefetchHighMark,
+                 null, null
+                 );
+           }
         }
-        
+
         private AmqChannel()
         {
            _messageFactoryRegistry = MessageFactoryRegistry.NewDefaultRegistry();
@@ -269,19 +293,30 @@
 
         public void Rollback()
         {
-            // FIXME: Fail over safety. Needs FailoverSupport?
-            CheckNotClosed();
-            CheckTransacted(); // throws IllegalOperationException if not a transacted session
+           lock ( _suspensionLock )
+           {
+              CheckTransacted(); // throws IllegalOperationException if not a transacted session
 
-            try
-            {
-                _connection.ConvenientProtocolWriter.SyncWrite(
-                        TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody));
-            }
-            catch (AMQException e)
-            {
-                throw new QpidException("Failed to rollback", e);
-            }
+              try
+              {
+                 bool suspended = IsSuspended;
+                 if ( !suspended )
+                    Suspend(true);
+                 
+                 // todo: rollback dispatcher when TX support is added
+                 //if ( _dispatcher != null )
+                 //   _dispatcher.Rollback();
+
+                 _connection.ConvenientProtocolWriter.SyncWrite(
+                         TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody));
+
+                 if ( !suspended )
+                    Suspend(false);
+              } catch ( AMQException e )
+              {
+                 throw new QpidException("Failed to rollback", e);
+              }
+           }
         }
 
         public override void Close()
@@ -539,21 +574,26 @@
                ReturnBouncedMessage(message);
             } else
             {
-               _queue.EnqueueBlocking(message);
+               _queue.Enqueue(message);
             }
         }
 
         public int DefaultPrefetch
         {
-            get
-            {
-                return _defaultPrefetch;
-            }
-            set
-            {
-                _defaultPrefetch = value;
-            }
-        }        
+           get { return DefaultPrefetchHigh; }
+        }
+        public int DefaultPrefetchLow
+        {
+            get { return _defaultPrefetchLowMark; }
+        }
+        public int DefaultPrefetchHigh
+        {
+           get { return _defaultPrefetchHighMark; }
+        }
+        public bool IsSuspended
+        {
+           get { return _suspended; }
+        }
 
         public ushort ChannelId
         {
@@ -581,6 +621,7 @@
 
         internal void Stop()
         {
+            Suspend(true);
             if (_dispatcher != null)
             {
                 _dispatcher.StopDispatcher();
@@ -883,7 +924,7 @@
          * @param multiple    if true will acknowledge all messages up to and including the one specified by the
          *                    delivery tag
          */
-        public void AcknowledgeMessage(ulong deliveryTag, bool multiple)
+        internal void AcknowledgeMessage(ulong deliveryTag, bool multiple)
         {
             AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, deliveryTag, multiple);
             if (_logger.IsDebugEnabled)
@@ -930,5 +971,39 @@
           }
 
        }
+
+       private void OnPrefetchLowMark(int count)
+       {
+          if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge )
+          {
+             _logger.Warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + count);
+             Suspend(false);
+          }
+       }
+       private void OnPrefetchHighMark(int count)
+       {
+          if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge )
+          {
+             _logger.Warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + count);
+             Suspend(true);
+          }
+       }
+
+       private void Suspend(bool suspend)
+       {
+          lock ( _suspensionLock )
+          {
+             if ( _logger.IsDebugEnabled )
+             {
+                _logger.Debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended"));
+             }
+
+             _suspended = suspend;
+             AMQFrame frame = ChannelFlowBody.CreateAMQFrame(_channelId, !suspend);
+
+             Connection.ConvenientProtocolWriter.SyncWrite(frame, typeof(ChannelFlowOkBody));
+          }
+       }
+
     }
 }

Modified: incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs?view=diff&rev=541920&r1=541919&r2=541920
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs Sat May 26 10:35:51 2007
@@ -326,7 +326,7 @@
             // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
             if (_channel != null)
             {
-                // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
+                // we set multiple to true here since acknowledgement implies acknowledge of all count messages
                 // received on the session
                 _channel.AcknowledgeMessage((ulong)DeliveryTag, true);
             }

Added: incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs?view=auto&rev=541920
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs (added)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs Sat May 26 10:35:51 2007
@@ -0,0 +1,98 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Text;
+using System.Threading;
+using Qpid.Collections;
+using Qpid.Common;
+
+namespace Qpid.Client.Util
+{
+   internal delegate void ThresholdMethod(int currentCount);
+
+   /// <summary>
+   /// Basic bounded queue used to implement prefetching.
+   /// Notice we do the callbacks here asynchronously to
+   /// avoid adding more complexity to the channel impl.
+   /// </summary>
+   internal class FlowControlQueue
+   {
+      private BlockingQueue _queue = new LinkedBlockingQueue();
+      private int _itemCount;
+      private int _lowerBound;
+      private int _upperBound;
+      private ThresholdMethod _underThreshold;
+      private ThresholdMethod _overThreshold;
+
+      public FlowControlQueue(
+         int lowerBound, 
+         int upperBound,
+         ThresholdMethod underThreshold,
+         ThresholdMethod overThreshold
+         )
+      {
+         _lowerBound = lowerBound;
+         _upperBound = upperBound;
+         _underThreshold = underThreshold;
+         _overThreshold = overThreshold;
+      }
+
+      public void Enqueue(object item)
+      {
+         _queue.EnqueueBlocking(item);
+         int count = Interlocked.Increment(ref _itemCount);
+         if ( _overThreshold != null )
+         {
+            if ( count == _upperBound )
+            {
+               _overThreshold.BeginInvoke(
+                  count, new AsyncCallback(OnAsyncCallEnd), 
+                  _overThreshold
+                  );
+            }
+         }
+      }
+
+      public object Dequeue()
+      {
+         object item = _queue.DequeueBlocking();
+         int count = Interlocked.Decrement(ref _itemCount);
+         if ( _underThreshold != null )
+         {
+            if ( count == _lowerBound )
+            {
+               _underThreshold.BeginInvoke(
+                  count, new AsyncCallback(OnAsyncCallEnd),
+                  _underThreshold
+                  );
+            }
+         }
+         return item;
+      }
+
+      private void OnAsyncCallEnd(IAsyncResult res)
+      {
+         ThresholdMethod method = (ThresholdMethod)res.AsyncState;
+         method.EndInvoke(res);
+      }
+   }
+}

Modified: incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj?view=diff&rev=541920&r1=541919&r2=541920
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj Sat May 26 10:35:51 2007
@@ -120,6 +120,7 @@
     <Compile Include="Client\Transport\Socket\Blocking\SslSocketConnector.cs" />
     <Compile Include="Client\Transport\Socket\Blocking\SocketConnector.cs" />
     <Compile Include="Client\Transport\Socket\Blocking\ISocketConnector.cs" />
+    <Compile Include="Client\Util\FlowControlQueue.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
     <Compile Include="qms\BrokerInfo.cs" />
     <Compile Include="qms\ConnectionInfo.cs" />

Modified: incubator/qpid/branches/M2/dotnet/Qpid.Client/default.build
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/default.build?view=diff&rev=541920&r1=541919&r2=541920
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client/default.build (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client/default.build Sat May 26 10:35:51 2007
@@ -23,7 +23,7 @@
             <include name="${build.dir}/Qpid.Codec.dll" />	
             <include name="${build.dir}/Qpid.Common.dll" />	
             <include name="${build.dir}/Qpid.Messaging.dll" />	
-            <include name="${build.dir}/Org.Mentalis.Security.dll" />	
+            <include name="${build.dir}/Org.Mentalis.Security.dll" />
          </references>
         </csc>
     </target>

Modified: incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IChannel.cs?view=diff&rev=541920&r1=541919&r2=541920
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IChannel.cs (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IChannel.cs Sat May 26 10:35:51 2007
@@ -43,10 +43,23 @@
         bool Transacted { get; }
 
         /// <summary>
-        /// Prefetch value to be used as the default for consumers created on this channel.
+        /// Prefetch value to be used as the default for 
+        /// consumers created on this channel.
         /// </summary>
-        int DefaultPrefetch { get; set; }
-        
+        int DefaultPrefetch { get; }
+
+        /// <summary>
+        /// Prefetch low value to be used as the default for 
+        /// consumers created on this channel.
+        /// </summary>
+        int DefaultPrefetchLow { get; }
+
+        /// <summary>
+        /// Prefetch high value to be used as the default for 
+        /// consumers created on this channel.
+        /// </summary>
+        int DefaultPrefetchHigh { get; }
+
         /// <summary>
         /// Declare a new exchange
         /// </summary>

Modified: incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IConnection.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IConnection.cs?view=diff&rev=541920&r1=541919&r2=541920
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IConnection.cs (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IConnection.cs Sat May 26 10:35:51 2007
@@ -47,6 +47,7 @@
 
         IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode);
         IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetch);
+        IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetchHigh, int prefetchLow);
 
         void Start();
         void Stop();

Modified: incubator/qpid/branches/M2/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs?view=diff&rev=541920&r1=541919&r2=541920
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs Sat May 26 10:35:51 2007
@@ -22,21 +22,21 @@
 {
     public class MessageConsumerBuilder
     {
-        public const int DEFAULT_PREFETCH_HIGH = 5000;
-
         private bool _noLocal = false;
         private bool _exclusive = false;
         private bool _durable = false;
         private string _subscriptionName = null;
         private IChannel _channel;
         private readonly string _queueName;
-        private int _prefetchLow = 2500;
-        private int _prefetchHigh = DEFAULT_PREFETCH_HIGH;
+        private int _prefetchLow;
+        private int _prefetchHigh;
 
         public MessageConsumerBuilder(IChannel channel, string queueName)
         {
             _channel = channel;
             _queueName = queueName;
+            _prefetchHigh = _channel.DefaultPrefetchHigh;
+            _prefetchLow = _channel.DefaultPrefetchLow;
         }
 
         public MessageConsumerBuilder WithPrefetchLow(int prefetchLow)