You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ru...@apache.org on 2008/02/11 15:50:20 UTC
svn commit: r620496 - in
/incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client: AmqChannel.cs
BasicMessageConsumer.cs
Author: rupertlssmith
Date: Mon Feb 11 06:50:18 2008
New Revision: 620496
URL: http://svn.apache.org/viewvc?rev=620496&view=rev
Log:
QPID-729 : Added explicit list of unacked messages, acked on commit, rejected on roll-back.
Modified:
incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/AmqChannel.cs
incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
Modified: incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/AmqChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/AmqChannel.cs?rev=620496&r1=620495&r2=620496&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/AmqChannel.cs (original)
+++ incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/AmqChannel.cs Mon Feb 11 06:50:18 2008
@@ -457,7 +457,7 @@
foreach (BasicMessageConsumer consumer in _consumers.Values)
{
// Sends acknowledgement to server.
- consumer.AcknowledgeLastDelivered();
+ consumer.AcknowledgeDelivered();
}
// Commits outstanding messages sent and outstanding acknowledgements.
@@ -485,13 +485,16 @@
{
Suspend(true);
}
-
- // todo: rollback dispatcher when TX support is added
- //if ( _dispatcher != null )
- // _dispatcher.Rollback();
- _connection.ConvenientProtocolWriter.SyncWrite(
- TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody));
+ // Reject up to message last delivered (if any) for each consumer.
+ // Need to send reject for messages delivered to consumers so far.
+ foreach (BasicMessageConsumer consumer in _consumers.Values)
+ {
+ // Sends acknowledgement to server.
+ consumer.RejectUnacked();
+ }
+
+ _connection.ConvenientProtocolWriter.SyncWrite(TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody));
if ( !suspended )
{
@@ -1012,6 +1015,15 @@
_connection.ProtocolWriter.Write(ackFrame);
}
+ public void RejectMessage(ulong deliveryTag, bool requeue)
+ {
+ if ((_acknowledgeMode == AcknowledgeMode.ClientAcknowledge) || (_acknowledgeMode == AcknowledgeMode.SessionTransacted))
+ {
+ AMQFrame rejectFrame = BasicRejectBody.CreateAMQFrame(_channelId, deliveryTag, requeue);
+ _connection.ProtocolWriter.Write(rejectFrame);
+ }
+ }
+
/// <summary>
/// Handle a message that bounced from the server, creating
/// the corresponding exception and notifying the connection about it
@@ -1104,8 +1116,8 @@
/// Placing stop check after consume may also be wrong as it may cause a message to be thrown away. Seems more correct to use interupt on
/// the block thread to cause it to prematurely return from its wait, whereupon it can be made to re-check the stop flag.</remarks>
///
- /// <remarks>Exception swalled, if there is an exception whilst notifying the connection on bounced messages. Unhandled excetpion should
- /// fall through and termiante the loop, as it is a bug if it occurrs.</remarks>
+ /// <remarks>Exception swallowed, if there is an exception whilst notifying the connection on bounced messages. Unhandled excetpion should
+ /// fall through and terminate the loop, as it is a bug if it occurrs.</remarks>
private class Dispatcher
{
/// <summary> Flag used to indicate when this dispatcher is to be stopped (0=go, 1=stop). </summary>
Modified: incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs?rev=620496&r1=620495&r2=620496&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs (original)
+++ incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs Mon Feb 11 06:50:18 2008
@@ -20,6 +20,8 @@
*/
using System;
using System.Threading;
+using System.Collections;
+using System.Collections.Generic;
using log4net;
using Apache.Qpid.Client.Message;
using Apache.Qpid.Collections;
@@ -106,10 +108,15 @@
private AmqChannel _channel;
+ // <summary>
+ // Tag of last message delievered, whoch should be acknowledged on commit in transaction mode.
+ // </summary>
+ //private long _lastDeliveryTag;
+
/// <summary>
- /// Tag of last message delievered, whoch should be acknowledged on commit in transaction mode.
+ /// Explicit list of all received but un-acked messages in a transaction. Used to ensure acking is completed when transaction is committed.
/// </summary>
- private long _lastDeliveryTag;
+ private LinkedList<long> _receivedDeliveryTags;
/// <summary>
/// Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
@@ -135,6 +142,11 @@
_prefetchHigh = prefetchHigh;
_prefetchLow = prefetchLow;
_exclusive = exclusive;
+
+ if (_acknowledgeMode == AcknowledgeMode.SessionTransacted)
+ {
+ _receivedDeliveryTags = new LinkedList<long>();
+ }
}
#region IMessageConsumer Members
@@ -391,13 +403,24 @@
/// <summary>
/// Acknowledge up to last message delivered (if any). Used when commiting.
/// </summary>
- internal void AcknowledgeLastDelivered()
+ internal void AcknowledgeDelivered()
{
- if (_lastDeliveryTag > 0)
+ foreach (long tag in _receivedDeliveryTags)
{
- _channel.AcknowledgeMessage((ulong)_lastDeliveryTag, true); // XXX evil cast
- _lastDeliveryTag = -1;
+ _channel.AcknowledgeMessage((ulong)tag, false);
}
+
+ _receivedDeliveryTags.Clear();
+ }
+
+ internal void RejectUnacked()
+ {
+ foreach (long tag in _receivedDeliveryTags)
+ {
+ _channel.RejectMessage((ulong)tag, true);
+ }
+
+ _receivedDeliveryTags.Clear();
}
private void PreDeliver(AbstractQmsMessage msg)
@@ -442,7 +465,7 @@
break;
case AcknowledgeMode.SessionTransacted:
- _lastDeliveryTag = msg.DeliveryTag;
+ _receivedDeliveryTags.AddLast(msg.DeliveryTag);
break;
}
}