You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/01/04 22:19:26 UTC
svn commit: r895790 -
/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs
Author: tabish
Date: Mon Jan 4 21:19:25 2010
New Revision: 895790
URL: http://svn.apache.org/viewvc?rev=895790&view=rev
Log:
Simplified the Implementation of the MessageConsumer that was ported over the NMS.ActiveMQ
Modified:
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs?rev=895790&r1=895789&r2=895790&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs Mon Jan 4 21:19:25 2010
@@ -27,11 +27,8 @@
{
public enum AckType
{
- DeliveredAck = 0, // Message delivered but not consumed
- PoisonAck = 1, // Message could not be processed due to poison pill but discard anyway
- ConsumedAck = 2, // Message consumed, discard
- RedeliveredAck = 3, // Message has been Redelivered and is not yet poisoned.
- IndividualAck = 4 // Only the given message is to be treated as consumed.
+ ConsumedAck = 1, // Message consumed, discard
+ IndividualAck = 2 // Only the given message is to be treated as consumed.
}
/// <summary>
@@ -49,8 +46,6 @@
private Atomic<bool> started = new Atomic<bool>();
private Atomic<bool> deliveringAcks = new Atomic<bool>();
- private int maximumRedeliveryCount = 10;
- private int redeliveryTimeout = 500;
protected bool disposed = false;
private int deliveredCounter = 0;
private int additionalWindowSize = 0;
@@ -59,9 +54,6 @@
private volatile bool synchronizationRegistered = false;
private bool clearDispatchList = false;
- private const int DEFAULT_REDELIVERY_DELAY = 0;
- private const int DEFAULT_MAX_REDELIVERIES = 5;
-
private event MessageListener listener;
private IRedeliveryPolicy redeliveryPolicy;
@@ -86,18 +78,6 @@
get { return info.ConsumerId; }
}
- public int MaximumRedeliveryCount
- {
- get { return maximumRedeliveryCount; }
- set { maximumRedeliveryCount = value; }
- }
-
- public int RedeliveryTimeout
- {
- get { return redeliveryTimeout; }
- set { redeliveryTimeout = value; }
- }
-
public int PrefetchSize
{
get { return this.info.PrefetchSize; }
@@ -247,17 +227,6 @@
{
Tracer.Debug("Closing down the Consumer");
- // Do we have any acks we need to send out before closing?
- // Ack any delivered messages now.
- if(!this.session.IsTransacted)
- {
- DeliverAcks();
- if(this.IsAutoAcknowledgeBatch)
- {
- Acknowledge();
- }
- }
-
if(!this.session.IsTransacted)
{
lock(this.dispatchedMessages)
@@ -363,29 +332,13 @@
if(this.deliveringAcks.CompareAndSet(false, true))
{
- if(this.IsAutoAcknowledgeEach)
- {
- lock(this.dispatchedMessages)
- {
- ack = MakeAckForAllDeliveredMessages(AckType.DeliveredAck);
- if(ack != null)
- {
- this.dispatchedMessages.Clear();
- }
- else
- {
- ack = this.pendingAck;
- this.pendingAck = null;
- }
- }
- }
- else if(pendingAck != null && pendingAck.AckType == (byte) AckType.ConsumedAck)
+ if(pendingAck != null && pendingAck.AckType == (byte) AckType.ConsumedAck)
{
ack = pendingAck;
pendingAck = null;
}
- if(ack != null)
+ if(pendingAck != null)
{
MessageAck ackToSend = ack;
@@ -419,7 +372,7 @@
this.clearDispatchList = false;
this.unconsumedMessages.Clear();
- if(this.pendingAck != null && this.pendingAck.AckType == (byte) AckType.DeliveredAck)
+ if(this.pendingAck != null)
{
// on resumption a pending delivered ack will be out of sync with
// re-deliveries.
@@ -452,7 +405,7 @@
}
catch(Exception e)
{
- if(IsAutoAcknowledgeBatch || IsAutoAcknowledgeEach || this.session.IsIndividualAcknowledge)
+ if(this.session.IsAutoAcknowledge || this.session.IsIndividualAcknowledge)
{
// Redeliver the message
}
@@ -596,7 +549,7 @@
public void BeforeMessageIsConsumed(MessageDispatch dispatch)
{
- if(!IsAutoAcknowledgeBatch)
+ if(!this.session.IsAutoAcknowledge)
{
lock(this.dispatchedMessages)
{
@@ -605,7 +558,7 @@
if(this.session.IsTransacted)
{
- this.AckLater(dispatch, AckType.DeliveredAck);
+ this.AckLater(dispatch);
}
}
}
@@ -624,7 +577,8 @@
this.dispatchedMessages.Remove(dispatch);
}
- AckLater(dispatch, AckType.DeliveredAck);
+ // TODO - Not sure if we need to ack this in stomp.
+ // AckLater(dispatch, AckType.ConsumedAck);
}
else
{
@@ -632,42 +586,29 @@
{
// Do nothing.
}
- else if(this.IsAutoAcknowledgeEach)
+ else if(this.session.IsAutoAcknowledge)
{
if(this.deliveringAcks.CompareAndSet(false, true))
{
lock(this.dispatchedMessages)
{
- if(this.dispatchedMessages.Count != 0)
- {
- MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
- if(ack != null)
- {
- this.dispatchedMessages.Clear();
- this.session.Connection.Oneway(ack);
- }
- }
+ MessageAck ack = new MessageAck();
+
+ ack.AckType = (byte) AckType.ConsumedAck;
+ ack.ConsumerId = this.info.ConsumerId;
+ ack.Destination = dispatch.Destination;
+ ack.LastMessageId = dispatch.Message.MessageId;
+ ack.MessageCount = 1;
+
+ this.session.Connection.Oneway(ack);
}
+
this.deliveringAcks.Value = false;
}
}
- else if(this.IsAutoAcknowledgeBatch)
- {
- AckLater(dispatch, AckType.ConsumedAck);
- }
else if(this.session.IsClientAcknowledge || this.session.IsIndividualAcknowledge)
{
- bool messageAckedByConsumer = false;
-
- lock(this.dispatchedMessages)
- {
- messageAckedByConsumer = this.dispatchedMessages.Contains(dispatch);
- }
-
- if(messageAckedByConsumer)
- {
- AckLater(dispatch, AckType.DeliveredAck);
- }
+ // Do nothing.
}
else
{
@@ -676,7 +617,7 @@
}
}
- private MessageAck MakeAckForAllDeliveredMessages(AckType type)
+ private MessageAck MakeAckForAllDeliveredMessages()
{
lock(this.dispatchedMessages)
{
@@ -688,7 +629,7 @@
MessageDispatch dispatch = this.dispatchedMessages.First.Value;
MessageAck ack = new MessageAck();
- ack.AckType = (byte) type;
+ ack.AckType = (byte) AckType.ConsumedAck;
ack.ConsumerId = this.info.ConsumerId;
ack.Destination = dispatch.Destination;
ack.LastMessageId = dispatch.Message.MessageId;
@@ -699,7 +640,7 @@
}
}
- private void AckLater(MessageDispatch dispatch, AckType type)
+ private void AckLater(MessageDispatch dispatch)
{
// Don't acknowledge now, but we may need to let the broker know the
// consumer got the message to expand the pre-fetch window
@@ -719,7 +660,7 @@
MessageAck oldPendingAck = pendingAck;
pendingAck = new MessageAck();
- pendingAck.AckType = (byte) type;
+ pendingAck.AckType = (byte) AckType.ConsumedAck;
pendingAck.ConsumerId = this.info.ConsumerId;
pendingAck.Destination = dispatch.Destination;
pendingAck.LastMessageId = dispatch.Message.MessageId;
@@ -734,31 +675,6 @@
{
pendingAck.FirstMessageId = pendingAck.LastMessageId;
}
- else if(oldPendingAck.AckType == pendingAck.AckType)
- {
- pendingAck.FirstMessageId = oldPendingAck.FirstMessageId;
- }
- else
- {
- // old pending ack being superseded by ack of another type, if is is not a delivered
- // ack and hence important, send it now so it is not lost.
- if(oldPendingAck.AckType != (byte) AckType.DeliveredAck)
- {
- if(Tracer.IsDebugEnabled)
- {
- Tracer.Debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
- }
-
- this.session.Connection.Oneway(oldPendingAck);
- }
- else
- {
- if(Tracer.IsDebugEnabled)
- {
- Tracer.Debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
- }
- }
- }
if((0.5 * this.info.PrefetchSize) <= (this.deliveredCounter - this.additionalWindowSize))
{
@@ -774,7 +690,7 @@
lock(this.dispatchedMessages)
{
// Acknowledge all messages so far.
- MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
+ MessageAck ack = MakeAckForAllDeliveredMessages();
if(ack == null)
{
@@ -828,7 +744,7 @@
redeliveryDelay = this.redeliveryPolicy.RedeliveryDelay(currentRedeliveryCount);
- MessageId firstMsgId = this.dispatchedMessages.Last.Value.Message.MessageId;
+// MessageId firstMsgId = this.dispatchedMessages.Last.Value.Message.MessageId;
foreach(MessageDispatch dispatch in this.dispatchedMessages)
{
@@ -839,39 +755,39 @@
if(this.redeliveryPolicy.MaximumRedeliveries >= 0 &&
lastMd.Message.RedeliveryCounter > this.redeliveryPolicy.MaximumRedeliveries)
{
- // We need to NACK the messages so that they get sent to the DLQ.
- MessageAck ack = new MessageAck();
-
- ack.AckType = (byte) AckType.PoisonAck;
- ack.ConsumerId = this.info.ConsumerId;
- ack.Destination = lastMd.Destination;
- ack.LastMessageId = lastMd.Message.MessageId;
- ack.MessageCount = this.dispatchedMessages.Count;
- ack.FirstMessageId = firstMsgId;
-
- this.session.Connection.Oneway(ack);
-
- // Adjust the window size.
- additionalWindowSize = Math.Max(0, this.additionalWindowSize - this.dispatchedMessages.Count);
+// // We need to NACK the messages so that they get sent to the DLQ.
+// MessageAck ack = new MessageAck();
+//
+// ack.AckType = (byte) AckType.PoisonAck;
+// ack.ConsumerId = this.info.ConsumerId;
+// ack.Destination = lastMd.Destination;
+// ack.LastMessageId = lastMd.Message.MessageId;
+// ack.MessageCount = this.dispatchedMessages.Count;
+// ack.FirstMessageId = firstMsgId;
+//
+// this.session.Connection.Oneway(ack);
+//
+// // Adjust the window size.
+// additionalWindowSize = Math.Max(0, this.additionalWindowSize - this.dispatchedMessages.Count);
this.redeliveryDelay = 0;
}
else
{
- // We only send a RedeliveryAck after the first redelivery
- if(currentRedeliveryCount > 0)
- {
- MessageAck ack = new MessageAck();
-
- ack.AckType = (byte) AckType.RedeliveredAck;
- ack.ConsumerId = this.info.ConsumerId;
- ack.Destination = lastMd.Destination;
- ack.LastMessageId = lastMd.Message.MessageId;
- ack.MessageCount = this.dispatchedMessages.Count;
- ack.FirstMessageId = firstMsgId;
-
- this.session.Connection.Oneway(ack);
- }
+// // We only send a RedeliveryAck after the first redelivery
+// if(currentRedeliveryCount > 0)
+// {
+// MessageAck ack = new MessageAck();
+//
+// ack.AckType = (byte) AckType.RedeliveredAck;
+// ack.ConsumerId = this.info.ConsumerId;
+// ack.Destination = lastMd.Destination;
+// ack.LastMessageId = lastMd.Message.MessageId;
+// ack.MessageCount = this.dispatchedMessages.Count;
+// ack.FirstMessageId = firstMsgId;
+//
+// this.session.Connection.Oneway(ack);
+// }
// stop the delivery of messages.
this.unconsumedMessages.Stop();
@@ -965,20 +881,6 @@
}
}
- private bool IsAutoAcknowledgeEach
- {
- get
- {
- return this.session.IsAutoAcknowledge ||
- (this.session.IsDupsOkAcknowledge && this.info.Destination.IsQueue);
- }
- }
-
- private bool IsAutoAcknowledgeBatch
- {
- get { return this.session.IsDupsOkAcknowledge && !this.info.Destination.IsQueue; }
- }
-
#region Nested ISyncronization Types
class MessageConsumerSynchronization : ISynchronization