You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ru...@apache.org on 2008/02/11 15:50:20 UTC

svn commit: r620496 - in /incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client: AmqChannel.cs BasicMessageConsumer.cs

Author: rupertlssmith
Date: Mon Feb 11 06:50:18 2008
New Revision: 620496

URL: http://svn.apache.org/viewvc?rev=620496&view=rev
Log:
QPID-729 : Added explicit list of unacked messages, acked on commit, rejected on roll-back.

Modified:
    incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/AmqChannel.cs
    incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs

Modified: incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/AmqChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/AmqChannel.cs?rev=620496&r1=620495&r2=620496&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/AmqChannel.cs (original)
+++ incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/AmqChannel.cs Mon Feb 11 06:50:18 2008
@@ -457,7 +457,7 @@
                 foreach (BasicMessageConsumer consumer  in _consumers.Values)
                 {
                     // Sends acknowledgement to server.
-                    consumer.AcknowledgeLastDelivered();
+                    consumer.AcknowledgeDelivered();
                 }
 
                 // Commits outstanding messages sent and outstanding acknowledgements.
@@ -485,13 +485,16 @@
                     {
                         Suspend(true);
                     }
-                 
-                    // todo: rollback dispatcher when TX support is added
-                    //if ( _dispatcher != null )
-                    //   _dispatcher.Rollback();
 
-                    _connection.ConvenientProtocolWriter.SyncWrite(
-                                                                   TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody));
+                    // Reject up to message last delivered (if any) for each consumer.
+                    // Need to send reject for messages delivered to consumers so far.
+                    foreach (BasicMessageConsumer consumer  in _consumers.Values)
+                    {
+                        // Sends acknowledgement to server.
+                        consumer.RejectUnacked();
+                    }
+
+                    _connection.ConvenientProtocolWriter.SyncWrite(TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody));
 
                     if ( !suspended )
                     {
@@ -1012,6 +1015,15 @@
             _connection.ProtocolWriter.Write(ackFrame);
         }
 
+        public void RejectMessage(ulong deliveryTag, bool requeue)
+        {
+            if ((_acknowledgeMode == AcknowledgeMode.ClientAcknowledge) || (_acknowledgeMode == AcknowledgeMode.SessionTransacted))
+            {
+                AMQFrame rejectFrame = BasicRejectBody.CreateAMQFrame(_channelId, deliveryTag, requeue);
+                _connection.ProtocolWriter.Write(rejectFrame);
+            }
+        }
+        
         /// <summary>
         /// Handle a message that bounced from the server, creating
         /// the corresponding exception and notifying the connection about it
@@ -1104,8 +1116,8 @@
         /// Placing stop check after consume may also be wrong as it may cause a message to be thrown away. Seems more correct to use interupt on 
         /// the block thread to cause it to prematurely return from its wait, whereupon it can be made to re-check the stop flag.</remarks>
         ///
-        /// <remarks>Exception swalled, if there is an exception whilst notifying the connection on bounced messages. Unhandled excetpion should
-        /// fall through and termiante the loop, as it is a bug if it occurrs.</remarks>
+        /// <remarks>Exception swallowed, if there is an exception whilst notifying the connection on bounced messages. Unhandled excetpion should
+        /// fall through and terminate the loop, as it is a bug if it occurrs.</remarks>
         private class Dispatcher
         {            
             /// <summary> Flag used to indicate when this dispatcher is to be stopped (0=go, 1=stop). </summary>

Modified: incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs?rev=620496&r1=620495&r2=620496&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs (original)
+++ incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs Mon Feb 11 06:50:18 2008
@@ -20,6 +20,8 @@
  */
 using System;
 using System.Threading;
+using System.Collections;
+using System.Collections.Generic;
 using log4net;
 using Apache.Qpid.Client.Message;
 using Apache.Qpid.Collections;
@@ -106,10 +108,15 @@
 
         private AmqChannel _channel;
 
+        // <summary>
+        // Tag of last message delievered, whoch should be acknowledged on commit in transaction mode.
+        // </summary>
+        //private long _lastDeliveryTag;
+
         /// <summary>
-        /// Tag of last message delievered, whoch should be acknowledged on commit in transaction mode.
+        /// Explicit list of all received but un-acked messages in a transaction. Used to ensure acking is completed when transaction is committed.
         /// </summary>
-        private long _lastDeliveryTag;
+        private LinkedList<long> _receivedDeliveryTags;
 
         /// <summary>
         /// Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
@@ -135,6 +142,11 @@
             _prefetchHigh = prefetchHigh;
             _prefetchLow = prefetchLow;
             _exclusive = exclusive;
+
+            if (_acknowledgeMode == AcknowledgeMode.SessionTransacted)
+            {
+                _receivedDeliveryTags = new LinkedList<long>();
+            }
         }
 
         #region IMessageConsumer Members
@@ -391,13 +403,24 @@
         /// <summary>
         /// Acknowledge up to last message delivered (if any). Used when commiting.
         /// </summary>
-        internal void AcknowledgeLastDelivered()
+        internal void AcknowledgeDelivered()
         {
-            if (_lastDeliveryTag > 0)
+            foreach (long tag in _receivedDeliveryTags)
             {
-                _channel.AcknowledgeMessage((ulong)_lastDeliveryTag, true); // XXX evil cast
-                _lastDeliveryTag = -1;
+                _channel.AcknowledgeMessage((ulong)tag, false);
             }
+
+            _receivedDeliveryTags.Clear();
+        }
+
+        internal void RejectUnacked()
+        {
+            foreach (long tag in _receivedDeliveryTags)
+            {
+                _channel.RejectMessage((ulong)tag, true);
+            }
+
+            _receivedDeliveryTags.Clear();
         }
 
         private void PreDeliver(AbstractQmsMessage msg)
@@ -442,7 +465,7 @@
                     break;
                 
                 case AcknowledgeMode.SessionTransacted:
-                    _lastDeliveryTag = msg.DeliveryTag;
+                    _receivedDeliveryTags.AddLast(msg.DeliveryTag);
                     break;
             }
         }