You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/08/06 21:21:15 UTC
svn commit: r1616314 - in
/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp:
MessageConsumer.cs Session.cs
Author: tabish
Date: Wed Aug 6 19:21:15 2014
New Revision: 1616314
URL: http://svn.apache.org/r1616314
Log:
https://issues.apache.org/jira/browse/AMQNET-454
Apply: https://issues.apache.org/jira/secure/attachment/12660198/Apache.NMS.AMQP-26-hook-in-session-ack.patch
Modified:
activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs
Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs?rev=1616314&r1=1616313&r2=1616314&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs Wed Aug 6 19:21:15 2014
@@ -196,6 +196,10 @@ namespace Apache.NMS.Amqp
{
nmsMessage = session.MessageConverter.ToNmsMessage(qpidMessage);
nmsMessage.NMSReplyTo = replyToDestination;
+ if (this.session.IsAutoAcknowledge)
+ {
+ this.session.Acknowledge();
+ }
}
return nmsMessage;
}
Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs?rev=1616314&r1=1616313&r2=1616314&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs Wed Aug 6 19:21:15 2014
@@ -69,6 +69,10 @@ namespace Apache.NMS.Amqp
// TODO: transactions
throw new NotSupportedException("Transactions are not supported by Qpid/Amqp");
}
+ else if (acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge)
+ {
+ this.acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
+ }
if (connection.IsStarted)
{
this.Start();
@@ -76,6 +80,36 @@ namespace Apache.NMS.Amqp
connection.AddSession(this);
}
+ public AcknowledgementMode AcknowledgementMode
+ {
+ get { return this.acknowledgementMode; }
+ }
+
+ public bool IsClientAcknowledge
+ {
+ get { return this.acknowledgementMode == AcknowledgementMode.ClientAcknowledge; }
+ }
+
+ public bool IsAutoAcknowledge
+ {
+ get { return this.acknowledgementMode == AcknowledgementMode.AutoAcknowledge; }
+ }
+
+ public bool IsDupsOkAcknowledge
+ {
+ get { return this.acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge; }
+ }
+
+ public bool IsIndividualAcknowledge
+ {
+ get { return this.acknowledgementMode == AcknowledgementMode.IndividualAcknowledge; }
+ }
+
+ public bool IsTransacted
+ {
+ get { return this.acknowledgementMode == AcknowledgementMode.Transactional; }
+ }
+
#region IStartable Methods
/// <summary>
/// Create new unmanaged session and start senders and receivers
@@ -503,11 +537,6 @@ namespace Apache.NMS.Amqp
get { return acknowledgementMode == AcknowledgementMode.Transactional; }
}
- public AcknowledgementMode AcknowledgementMode
- {
- get { throw new NotImplementedException(); }
- }
-
private ConsumerTransformerDelegate consumerTransformer;
public ConsumerTransformerDelegate ConsumerTransformer
{
@@ -589,6 +618,35 @@ namespace Apache.NMS.Amqp
return qpidSession.CreateSender(address);
}
+ //
+ // Acknowledges all outstanding messages that have been received
+ // by the application on this session.
+ //
+ // @param sync if true, blocks until the acknowledgement has been
+ // processed by the server
+ //
+ public void Acknowledge()
+ {
+ qpidSession.Acknowledge(false);
+ }
+
+ public void Acknowledge(bool sync)
+ {
+ qpidSession.Acknowledge(sync);
+ }
+
+ //
+ // These flavors of acknowledge are available in the qpid messaging
+ // interface but not exposed to the NMS message/session stack.
+ //
+ // Acknowledges the specified message.
+ //
+ // void acknowledge(Message&, bool sync=false);
+ //
+ // Acknowledges all message up to the specified message.
+ //
+ // void acknowledgeUpTo(Message&, bool sync=false);
+
#region Transaction State Events
public event SessionTxEventDelegate TransactionStartedListener;