You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/05/05 14:29:16 UTC
svn commit: r653451 -
/incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs
Author: aidan
Date: Mon May 5 05:29:15 2008
New Revision: 653451
URL: http://svn.apache.org/viewvc?rev=653451&view=rev
Log:
QPID-1022 Use synchronous writes to fix race conditions
Modified:
incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs
Modified: incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs?rev=653451&r1=653450&r2=653451&view=diff
==============================================================================
--- incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs (original)
+++ incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs Mon May 5 05:29:15 2008
@@ -888,10 +888,14 @@
/// <param name="consumer"></param>
private void RegisterConsumer(BasicMessageConsumer consumer)
{
+ // Need to generate a consumer tag on the client so we can exploit the nowait flag.
+ String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++);
+ consumer.ConsumerTag = tag;
+ _consumers.Add(tag, consumer);
+
String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal,
- consumer.Exclusive, consumer.AcknowledgeMode);
- consumer.ConsumerTag = consumerTag;
- _consumers.Add(consumerTag, consumer);
+ consumer.Exclusive, consumer.AcknowledgeMode, tag);
+
}
internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args)
@@ -902,19 +906,17 @@
AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0,
queueName, exchangeName,
- routingKey, true, args);
+ routingKey, false, args);
_replayFrames.Add(queueBind);
lock (_connection.FailoverMutex)
{
- _connection.ProtocolWriter.Write(queueBind);
+ _connection.ConvenientProtocolWriter.SyncWrite(queueBind, typeof(QueueBindOkBody));
}
}
- private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode)
+ private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode, String tag)
{
- // Need to generate a consumer tag on the client so we can exploit the nowait flag.
- String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++);
AMQFrame basicConsume = BasicConsumeBody.CreateAMQFrame(_channelId, 0,
queueName, tag, noLocal,
@@ -958,13 +960,13 @@
queueName, isDurable, isExclusive, isAutoDelete));
AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive,
- isAutoDelete, true, null);
+ isAutoDelete, false, null);
_replayFrames.Add(queueDeclare);
lock (_connection.FailoverMutex)
{
- _connection.ProtocolWriter.Write(queueDeclare);
+ _connection.ConvenientProtocolWriter.SyncWrite(queueDeclare, typeof(QueueDeclareOkBody));
}
}