You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by to...@apache.org on 2007/05/26 19:35:52 UTC
svn commit: r541920 - in /incubator/qpid/branches/M2/dotnet:
Qpid.Client.Tests/Common/ Qpid.Client.Tests/SimpleConsumer/ Qpid.Client/
Qpid.Client/Client/ Qpid.Client/Client/Message/ Qpid.Client/Client/Util/
Qpid.Messaging/
Author: tomasr
Date: Sat May 26 10:35:51 2007
New Revision: 541920
URL: http://svn.apache.org/viewvc?view=rev&rev=541920
Log:
QPID-136 Initial Prefetch Implementation
Added:
incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Util/
incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs
Modified:
incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Common/BaseMessagingTestFixture.cs
incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs
incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AMQConnection.cs
incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs
incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj
incubator/qpid/branches/M2/dotnet/Qpid.Client/default.build
incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IChannel.cs
incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IConnection.cs
incubator/qpid/branches/M2/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs
Modified: incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Common/BaseMessagingTestFixture.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Common/BaseMessagingTestFixture.cs?view=diff&rev=541920&r1=541919&r2=541920
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Common/BaseMessagingTestFixture.cs (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Common/BaseMessagingTestFixture.cs Sat May 26 10:35:51 2007
@@ -56,7 +56,7 @@
{
IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(connectionUri);
_connection = new AMQConnection(connectionInfo);
- _channel = _connection.CreateChannel(false, AcknowledgeMode.NoAcknowledge, 1);
+ _channel = _connection.CreateChannel(false, AcknowledgeMode.NoAcknowledge, 500, 300);
}
catch (QpidException e)
{
Modified: incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs?view=diff&rev=541920&r1=541919&r2=541920
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs Sat May 26 10:35:51 2007
@@ -96,7 +96,7 @@
_publisher.Send(msg);
}
- _logger.Error("All messages sent");
+ _logger.Debug("All messages sent");
// receive all messages
for ( int i = 0; i < MESSAGE_COUNT; i++ )
{
Modified: incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AMQConnection.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AMQConnection.cs?view=diff&rev=541920&r1=541919&r2=541920
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AMQConnection.cs (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AMQConnection.cs Sat May 26 10:35:51 2007
@@ -273,15 +273,17 @@
private bool _transacted;
private AcknowledgeMode _acknowledgeMode;
- int _prefetch;
+ int _prefetchHigh;
+ int _prefetchLow;
AMQConnection _connection;
- public CreateChannelFailoverSupport(AMQConnection connection, bool transacted, AcknowledgeMode acknowledgeMode, int prefetch)
+ public CreateChannelFailoverSupport(AMQConnection connection, bool transacted, AcknowledgeMode acknowledgeMode, int prefetchHigh, int prefetchLow)
{
_connection = connection;
_transacted = transacted;
_acknowledgeMode = acknowledgeMode;
- _prefetch = prefetch;
+ _prefetchHigh = prefetchHigh;
+ _prefetchLow = prefetchLow;
}
protected override object operation()
@@ -297,14 +299,14 @@
// open it, so that there is no window where we could receive data on the channel and not be set
// up to handle it appropriately.
AmqChannel channel = new AmqChannel(_connection,
- channelId, _transacted, _acknowledgeMode, _prefetch);
+ channelId, _transacted, _acknowledgeMode, _prefetchHigh, _prefetchLow);
_connection.ProtocolSession.AddSessionByChannel(channelId, channel);
_connection.RegisterSession(channelId, channel);
bool success = false;
try
{
- _connection.createChannelOverWire(channelId, (ushort)_prefetch, _transacted);
+ _connection.CreateChannelOverWire(channelId, _prefetchHigh, _prefetchLow, _transacted);
success = true;
}
catch (AMQException e)
@@ -334,11 +336,16 @@
public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode)
{
- return CreateChannel(transacted, acknowledgeMode, AmqChannel.DEFAULT_PREFETCH);
+ return CreateChannel(transacted, acknowledgeMode, AmqChannel.DEFAULT_PREFETCH_HIGH_MARK);
}
public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetch)
{
+ return CreateChannel(transacted, acknowledgeMode, prefetch, prefetch);
+ }
+
+ public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetchHigh, int prefetchLow)
+ {
CheckNotClosed();
if (ChannelLimitReached())
{
@@ -347,7 +354,7 @@
else
{
CreateChannelFailoverSupport operation =
- new CreateChannelFailoverSupport(this, transacted, acknowledgeMode, prefetch);
+ new CreateChannelFailoverSupport(this, transacted, acknowledgeMode, prefetchHigh, prefetchLow);
return (IChannel)operation.execute(this);
}
}
@@ -774,18 +781,23 @@
foreach (AmqChannel channel in channels)
{
_protocolSession.AddSessionByChannel(channel.ChannelId, channel);
- ReopenChannel(channel.ChannelId, (ushort)channel.DefaultPrefetch, channel.Transacted);
+ ReopenChannel(
+ channel.ChannelId,
+ channel.DefaultPrefetchHigh,
+ channel.DefaultPrefetchLow,
+ channel.Transacted
+ );
channel.ReplayOnFailOver();
}
}
- private void ReopenChannel(ushort channelId, ushort prefetch, bool transacted)
+ private void ReopenChannel(ushort channelId, int prefetchHigh, int prefetchLow, bool transacted)
{
- _log.Debug(string.Format("Reopening channel id={0} prefetch={1} transacted={2}",
- channelId, prefetch, transacted));
+ _log.Debug(string.Format("Reopening channel id={0} prefetchHigh={1} prefetchLow={2} transacted={3}",
+ channelId, prefetchHigh, prefetchLow, transacted));
try
{
- createChannelOverWire(channelId, prefetch, transacted);
+ CreateChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
}
catch (AMQException e)
{
@@ -795,7 +807,7 @@
}
}
- void createChannelOverWire(ushort channelId, ushort prefetch, bool transacted)
+ void CreateChannelOverWire(ushort channelId, int prefetchHigh, int prefetchLow, bool transacted)
{
_protocolWriter.SyncWrite(ChannelOpenBody.CreateAMQFrame(channelId, null), typeof (ChannelOpenOkBody));
@@ -805,7 +817,8 @@
{
// Basic.Qos frame appears to not be supported by OpenAMQ 1.0d.
_protocolWriter.SyncWrite(
- BasicQosBody.CreateAMQFrame(channelId, 0, prefetch, false),
+ BasicQosBody.CreateAMQFrame(
+ channelId, (uint)prefetchHigh, 0, false),
typeof (BasicQosOkBody));
}
Modified: incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs?view=diff&rev=541920&r1=541919&r2=541920
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs Sat May 26 10:35:51 2007
@@ -25,6 +25,7 @@
using log4net;
using Qpid.Buffer;
using Qpid.Client.Message;
+using Qpid.Client.Util;
using Qpid.Collections;
using Qpid.Framing;
using Qpid.Messaging;
@@ -41,11 +42,14 @@
private static int _nextSessionNumber = 0;
private int _sessionNumber;
+ private bool _suspended;
+ private object _suspensionLock = new object();
// Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature.
private int _nextConsumerNumber = 1;
- internal const int DEFAULT_PREFETCH = MessageConsumerBuilder.DEFAULT_PREFETCH_HIGH;
+ public const int DEFAULT_PREFETCH_HIGH_MARK = 5000;
+ public const int DEFAULT_PREFETCH_LOW_MARK = 2500;
private AMQConnection _connection;
@@ -55,9 +59,10 @@
private ushort _channelId;
- private int _defaultPrefetch = DEFAULT_PREFETCH;
+ private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK;
+ private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
- private BlockingQueue _queue = new LinkedBlockingQueue();
+ private FlowControlQueue _queue;
private Dispatcher _dispatcher;
@@ -105,7 +110,7 @@
{
UnprocessedMessage message;
- while (_stopped == 0 && (message = (UnprocessedMessage)_containingChannel._queue.DequeueBlocking()) != null)
+ while (_stopped == 0 && (message = (UnprocessedMessage)_containingChannel._queue.Dequeue()) != null)
{
//_queue.size()
DispatchMessage(message);
@@ -163,8 +168,9 @@
/// <param name="channelId">The channel id.</param>
/// <param name="transacted">if set to <c>true</c> [transacted].</param>
/// <param name="acknowledgeMode">The acknowledge mode.</param>
- /// <param name="defaultPrefetch">Default prefetch value</param>
- internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, int defaultPrefetch)
+ /// <param name="defaultPrefetchHigh">Default prefetch high value</param>
+ /// <param name="defaultPrefetchLow">Default prefetch low value</param>
+ internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
: this()
{
_sessionNumber = Interlocked.Increment(ref _nextSessionNumber);
@@ -178,8 +184,26 @@
_acknowledgeMode = acknowledgeMode;
}
_channelId = channelId;
+ _defaultPrefetchHighMark = defaultPrefetchHigh;
+ _defaultPrefetchLowMark = defaultPrefetchLow;
+
+ if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge )
+ {
+ _queue = new FlowControlQueue(
+ _defaultPrefetchLowMark, _defaultPrefetchHighMark,
+ new ThresholdMethod(OnPrefetchLowMark),
+ new ThresholdMethod(OnPrefetchHighMark)
+ );
+ } else
+ {
+ // low and upper are the same
+ _queue = new FlowControlQueue(
+ _defaultPrefetchHighMark, _defaultPrefetchHighMark,
+ null, null
+ );
+ }
}
-
+
private AmqChannel()
{
_messageFactoryRegistry = MessageFactoryRegistry.NewDefaultRegistry();
@@ -269,19 +293,30 @@
public void Rollback()
{
- // FIXME: Fail over safety. Needs FailoverSupport?
- CheckNotClosed();
- CheckTransacted(); // throws IllegalOperationException if not a transacted session
+ lock ( _suspensionLock )
+ {
+ CheckTransacted(); // throws IllegalOperationException if not a transacted session
- try
- {
- _connection.ConvenientProtocolWriter.SyncWrite(
- TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody));
- }
- catch (AMQException e)
- {
- throw new QpidException("Failed to rollback", e);
- }
+ try
+ {
+ bool suspended = IsSuspended;
+ if ( !suspended )
+ Suspend(true);
+
+ // todo: rollback dispatcher when TX support is added
+ //if ( _dispatcher != null )
+ // _dispatcher.Rollback();
+
+ _connection.ConvenientProtocolWriter.SyncWrite(
+ TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody));
+
+ if ( !suspended )
+ Suspend(false);
+ } catch ( AMQException e )
+ {
+ throw new QpidException("Failed to rollback", e);
+ }
+ }
}
public override void Close()
@@ -539,21 +574,26 @@
ReturnBouncedMessage(message);
} else
{
- _queue.EnqueueBlocking(message);
+ _queue.Enqueue(message);
}
}
public int DefaultPrefetch
{
- get
- {
- return _defaultPrefetch;
- }
- set
- {
- _defaultPrefetch = value;
- }
- }
+ get { return DefaultPrefetchHigh; }
+ }
+ public int DefaultPrefetchLow
+ {
+ get { return _defaultPrefetchLowMark; }
+ }
+ public int DefaultPrefetchHigh
+ {
+ get { return _defaultPrefetchHighMark; }
+ }
+ public bool IsSuspended
+ {
+ get { return _suspended; }
+ }
public ushort ChannelId
{
@@ -581,6 +621,7 @@
internal void Stop()
{
+ Suspend(true);
if (_dispatcher != null)
{
_dispatcher.StopDispatcher();
@@ -883,7 +924,7 @@
* @param multiple if true will acknowledge all messages up to and including the one specified by the
* delivery tag
*/
- public void AcknowledgeMessage(ulong deliveryTag, bool multiple)
+ internal void AcknowledgeMessage(ulong deliveryTag, bool multiple)
{
AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, deliveryTag, multiple);
if (_logger.IsDebugEnabled)
@@ -930,5 +971,39 @@
}
}
+
+ private void OnPrefetchLowMark(int count)
+ {
+ if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge )
+ {
+ _logger.Warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + count);
+ Suspend(false);
+ }
+ }
+ private void OnPrefetchHighMark(int count)
+ {
+ if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge )
+ {
+ _logger.Warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + count);
+ Suspend(true);
+ }
+ }
+
+ private void Suspend(bool suspend)
+ {
+ lock ( _suspensionLock )
+ {
+ if ( _logger.IsDebugEnabled )
+ {
+ _logger.Debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended"));
+ }
+
+ _suspended = suspend;
+ AMQFrame frame = ChannelFlowBody.CreateAMQFrame(_channelId, !suspend);
+
+ Connection.ConvenientProtocolWriter.SyncWrite(frame, typeof(ChannelFlowOkBody));
+ }
+ }
+
}
}
Modified: incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs?view=diff&rev=541920&r1=541919&r2=541920
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs Sat May 26 10:35:51 2007
@@ -326,7 +326,7 @@
// is not specified. In our case, we only set the session field where client acknowledge mode is specified.
if (_channel != null)
{
- // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
+ // we set multiple to true here since acknowledgement implies acknowledge of all count messages
// received on the session
_channel.AcknowledgeMessage((ulong)DeliveryTag, true);
}
Added: incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs?view=auto&rev=541920
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs (added)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs Sat May 26 10:35:51 2007
@@ -0,0 +1,98 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Text;
+using System.Threading;
+using Qpid.Collections;
+using Qpid.Common;
+
+namespace Qpid.Client.Util
+{
+ internal delegate void ThresholdMethod(int currentCount);
+
+ /// <summary>
+ /// Basic bounded queue used to implement prefetching.
+ /// Notice we do the callbacks here asynchronously to
+ /// avoid adding more complexity to the channel impl.
+ /// </summary>
+ internal class FlowControlQueue
+ {
+ private BlockingQueue _queue = new LinkedBlockingQueue();
+ private int _itemCount;
+ private int _lowerBound;
+ private int _upperBound;
+ private ThresholdMethod _underThreshold;
+ private ThresholdMethod _overThreshold;
+
+ public FlowControlQueue(
+ int lowerBound,
+ int upperBound,
+ ThresholdMethod underThreshold,
+ ThresholdMethod overThreshold
+ )
+ {
+ _lowerBound = lowerBound;
+ _upperBound = upperBound;
+ _underThreshold = underThreshold;
+ _overThreshold = overThreshold;
+ }
+
+ public void Enqueue(object item)
+ {
+ _queue.EnqueueBlocking(item);
+ int count = Interlocked.Increment(ref _itemCount);
+ if ( _overThreshold != null )
+ {
+ if ( count == _upperBound )
+ {
+ _overThreshold.BeginInvoke(
+ count, new AsyncCallback(OnAsyncCallEnd),
+ _overThreshold
+ );
+ }
+ }
+ }
+
+ public object Dequeue()
+ {
+ object item = _queue.DequeueBlocking();
+ int count = Interlocked.Decrement(ref _itemCount);
+ if ( _underThreshold != null )
+ {
+ if ( count == _lowerBound )
+ {
+ _underThreshold.BeginInvoke(
+ count, new AsyncCallback(OnAsyncCallEnd),
+ _underThreshold
+ );
+ }
+ }
+ return item;
+ }
+
+ private void OnAsyncCallEnd(IAsyncResult res)
+ {
+ ThresholdMethod method = (ThresholdMethod)res.AsyncState;
+ method.EndInvoke(res);
+ }
+ }
+}
Modified: incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj?view=diff&rev=541920&r1=541919&r2=541920
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj Sat May 26 10:35:51 2007
@@ -120,6 +120,7 @@
<Compile Include="Client\Transport\Socket\Blocking\SslSocketConnector.cs" />
<Compile Include="Client\Transport\Socket\Blocking\SocketConnector.cs" />
<Compile Include="Client\Transport\Socket\Blocking\ISocketConnector.cs" />
+ <Compile Include="Client\Util\FlowControlQueue.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="qms\BrokerInfo.cs" />
<Compile Include="qms\ConnectionInfo.cs" />
Modified: incubator/qpid/branches/M2/dotnet/Qpid.Client/default.build
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/default.build?view=diff&rev=541920&r1=541919&r2=541920
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client/default.build (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client/default.build Sat May 26 10:35:51 2007
@@ -23,7 +23,7 @@
<include name="${build.dir}/Qpid.Codec.dll" />
<include name="${build.dir}/Qpid.Common.dll" />
<include name="${build.dir}/Qpid.Messaging.dll" />
- <include name="${build.dir}/Org.Mentalis.Security.dll" />
+ <include name="${build.dir}/Org.Mentalis.Security.dll" />
</references>
</csc>
</target>
Modified: incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IChannel.cs?view=diff&rev=541920&r1=541919&r2=541920
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IChannel.cs (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IChannel.cs Sat May 26 10:35:51 2007
@@ -43,10 +43,23 @@
bool Transacted { get; }
/// <summary>
- /// Prefetch value to be used as the default for consumers created on this channel.
+ /// Prefetch value to be used as the default for
+ /// consumers created on this channel.
/// </summary>
- int DefaultPrefetch { get; set; }
-
+ int DefaultPrefetch { get; }
+
+ /// <summary>
+ /// Prefetch low value to be used as the default for
+ /// consumers created on this channel.
+ /// </summary>
+ int DefaultPrefetchLow { get; }
+
+ /// <summary>
+ /// Prefetch high value to be used as the default for
+ /// consumers created on this channel.
+ /// </summary>
+ int DefaultPrefetchHigh { get; }
+
/// <summary>
/// Declare a new exchange
/// </summary>
Modified: incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IConnection.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IConnection.cs?view=diff&rev=541920&r1=541919&r2=541920
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IConnection.cs (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IConnection.cs Sat May 26 10:35:51 2007
@@ -47,6 +47,7 @@
IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode);
IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetch);
+ IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetchHigh, int prefetchLow);
void Start();
void Stop();
Modified: incubator/qpid/branches/M2/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs?view=diff&rev=541920&r1=541919&r2=541920
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs Sat May 26 10:35:51 2007
@@ -22,21 +22,21 @@
{
public class MessageConsumerBuilder
{
- public const int DEFAULT_PREFETCH_HIGH = 5000;
-
private bool _noLocal = false;
private bool _exclusive = false;
private bool _durable = false;
private string _subscriptionName = null;
private IChannel _channel;
private readonly string _queueName;
- private int _prefetchLow = 2500;
- private int _prefetchHigh = DEFAULT_PREFETCH_HIGH;
+ private int _prefetchLow;
+ private int _prefetchHigh;
public MessageConsumerBuilder(IChannel channel, string queueName)
{
_channel = channel;
_queueName = queueName;
+ _prefetchHigh = _channel.DefaultPrefetchHigh;
+ _prefetchLow = _channel.DefaultPrefetchLow;
}
public MessageConsumerBuilder WithPrefetchLow(int prefetchLow)