You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/05/05 14:29:16 UTC

svn commit: r653451 - /incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs

Author: aidan
Date: Mon May  5 05:29:15 2008
New Revision: 653451

URL: http://svn.apache.org/viewvc?rev=653451&view=rev
Log:
QPID-1022 Use synchronous writes to fix race conditions

Modified:
    incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs

Modified: incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs?rev=653451&r1=653450&r2=653451&view=diff
==============================================================================
--- incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs (original)
+++ incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs Mon May  5 05:29:15 2008
@@ -888,10 +888,14 @@
         /// <param name="consumer"></param>
         private void RegisterConsumer(BasicMessageConsumer consumer)
         {
+            // Need to generate a consumer tag on the client so we can exploit the nowait flag.
+            String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++);
+            consumer.ConsumerTag = tag;
+            _consumers.Add(tag, consumer);
+            
             String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal,
-                                                  consumer.Exclusive, consumer.AcknowledgeMode);
-            consumer.ConsumerTag = consumerTag;
-            _consumers.Add(consumerTag, consumer);
+                                                  consumer.Exclusive, consumer.AcknowledgeMode, tag);
+            
         }
 
         internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args)
@@ -902,19 +906,17 @@
 
             AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0,
                                                               queueName, exchangeName,
-                                                              routingKey, true, args);
+                                                              routingKey, false, args);
             _replayFrames.Add(queueBind);
 
             lock (_connection.FailoverMutex)
             {
-                _connection.ProtocolWriter.Write(queueBind);
+                _connection.ConvenientProtocolWriter.SyncWrite(queueBind, typeof(QueueBindOkBody));
             }
         }
 
-        private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode)
+        private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode, String tag)
         {
-            // Need to generate a consumer tag on the client so we can exploit the nowait flag.
-            String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++);
             
             AMQFrame basicConsume = BasicConsumeBody.CreateAMQFrame(_channelId, 0,
                                                                     queueName, tag, noLocal,
@@ -958,13 +960,13 @@
                                         queueName, isDurable, isExclusive, isAutoDelete));
 
             AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive,
-                                                                    isAutoDelete, true, null);
+                                                                    isAutoDelete, false, null);
 
             _replayFrames.Add(queueDeclare);
 
             lock (_connection.FailoverMutex)
             {
-                _connection.ProtocolWriter.Write(queueDeclare);
+                _connection.ConvenientProtocolWriter.SyncWrite(queueDeclare, typeof(QueueDeclareOkBody));
             }
         }