You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/01/04 22:19:26 UTC

svn commit: r895790 - /activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs

Author: tabish
Date: Mon Jan  4 21:19:25 2010
New Revision: 895790

URL: http://svn.apache.org/viewvc?rev=895790&view=rev
Log:
Simplified the Implementation of the MessageConsumer that was ported over the NMS.ActiveMQ

Modified:
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs?rev=895790&r1=895789&r2=895790&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs Mon Jan  4 21:19:25 2010
@@ -27,11 +27,8 @@
 {
     public enum AckType
     {
-        DeliveredAck = 0, // Message delivered but not consumed
-        PoisonAck = 1, // Message could not be processed due to poison pill but discard anyway
-        ConsumedAck = 2, // Message consumed, discard
-        RedeliveredAck = 3, // Message has been Redelivered and is not yet poisoned.
-        IndividualAck = 4 // Only the given message is to be treated as consumed.
+        ConsumedAck = 1, // Message consumed, discard
+        IndividualAck = 2 // Only the given message is to be treated as consumed.
     }
 
     /// <summary>
@@ -49,8 +46,6 @@
         private Atomic<bool> started = new Atomic<bool>();
         private Atomic<bool> deliveringAcks = new Atomic<bool>();
 
-        private int maximumRedeliveryCount = 10;
-        private int redeliveryTimeout = 500;
         protected bool disposed = false;
         private int deliveredCounter = 0;
         private int additionalWindowSize = 0;
@@ -59,9 +54,6 @@
         private volatile bool synchronizationRegistered = false;
         private bool clearDispatchList = false;
 
-        private const int DEFAULT_REDELIVERY_DELAY = 0;
-        private const int DEFAULT_MAX_REDELIVERIES = 5;
-
         private event MessageListener listener;
 
         private IRedeliveryPolicy redeliveryPolicy;
@@ -86,18 +78,6 @@
             get { return info.ConsumerId; }
         }
 
-        public int MaximumRedeliveryCount
-        {
-            get { return maximumRedeliveryCount; }
-            set { maximumRedeliveryCount = value; }
-        }
-
-        public int RedeliveryTimeout
-        {
-            get { return redeliveryTimeout; }
-            set { redeliveryTimeout = value; }
-        }
-
         public int PrefetchSize
         {
             get { return this.info.PrefetchSize; }
@@ -247,17 +227,6 @@
             {
                 Tracer.Debug("Closing down the Consumer");
 
-                // Do we have any acks we need to send out before closing?
-                // Ack any delivered messages now.
-                if(!this.session.IsTransacted)
-                {
-                    DeliverAcks();
-                    if(this.IsAutoAcknowledgeBatch)
-                    {
-                        Acknowledge();
-                    }
-                }
-
                 if(!this.session.IsTransacted)
                 {
                     lock(this.dispatchedMessages)
@@ -363,29 +332,13 @@
 
             if(this.deliveringAcks.CompareAndSet(false, true))
             {
-                if(this.IsAutoAcknowledgeEach)
-                {
-                    lock(this.dispatchedMessages)
-                    {
-                        ack = MakeAckForAllDeliveredMessages(AckType.DeliveredAck);
-                        if(ack != null)
-                        {
-                            this.dispatchedMessages.Clear();
-                        }
-                        else
-                        {
-                            ack = this.pendingAck;
-                            this.pendingAck = null;
-                        }
-                    }
-                }
-                else if(pendingAck != null && pendingAck.AckType == (byte) AckType.ConsumedAck)
+                if(pendingAck != null && pendingAck.AckType == (byte) AckType.ConsumedAck)
                 {
                     ack = pendingAck;
                     pendingAck = null;
                 }
 
-                if(ack != null)
+                if(pendingAck != null)
                 {
                     MessageAck ackToSend = ack;
 
@@ -419,7 +372,7 @@
                         this.clearDispatchList = false;
                         this.unconsumedMessages.Clear();
 
-                        if(this.pendingAck != null && this.pendingAck.AckType == (byte) AckType.DeliveredAck)
+                        if(this.pendingAck != null)
                         {
                             // on resumption a pending delivered ack will be out of sync with
                             // re-deliveries.
@@ -452,7 +405,7 @@
                             }
                             catch(Exception e)
                             {
-                                if(IsAutoAcknowledgeBatch || IsAutoAcknowledgeEach || this.session.IsIndividualAcknowledge)
+                                if(this.session.IsAutoAcknowledge || this.session.IsIndividualAcknowledge)
                                 {
                                     // Redeliver the message
                                 }
@@ -596,7 +549,7 @@
 
         public void BeforeMessageIsConsumed(MessageDispatch dispatch)
         {
-            if(!IsAutoAcknowledgeBatch)
+            if(!this.session.IsAutoAcknowledge)
             {
                 lock(this.dispatchedMessages)
                 {
@@ -605,7 +558,7 @@
 
                 if(this.session.IsTransacted)
                 {
-                    this.AckLater(dispatch, AckType.DeliveredAck);
+                    this.AckLater(dispatch);
                 }
             }
         }
@@ -624,7 +577,8 @@
                     this.dispatchedMessages.Remove(dispatch);
                 }
 
-                AckLater(dispatch, AckType.DeliveredAck);
+				// TODO - Not sure if we need to ack this in stomp.
+                // AckLater(dispatch, AckType.ConsumedAck);
             }
             else
             {
@@ -632,42 +586,29 @@
                 {
                     // Do nothing.
                 }
-                else if(this.IsAutoAcknowledgeEach)
+                else if(this.session.IsAutoAcknowledge)
                 {
                     if(this.deliveringAcks.CompareAndSet(false, true))
                     {
                         lock(this.dispatchedMessages)
                         {
-                            if(this.dispatchedMessages.Count != 0)
-                            {
-                                MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
-                                if(ack != null)
-                                {
-                                    this.dispatchedMessages.Clear();
-                                    this.session.Connection.Oneway(ack);
-                                }
-                            }
+			                MessageAck ack = new MessageAck();
+			
+			                ack.AckType = (byte) AckType.ConsumedAck;
+			                ack.ConsumerId = this.info.ConsumerId;
+			                ack.Destination = dispatch.Destination;
+			                ack.LastMessageId = dispatch.Message.MessageId;
+			                ack.MessageCount = 1;
+			
+                            this.session.Connection.Oneway(ack);
                         }
+						
                         this.deliveringAcks.Value = false;
                     }
                 }
-                else if(this.IsAutoAcknowledgeBatch)
-                {
-                    AckLater(dispatch, AckType.ConsumedAck);
-                }
                 else if(this.session.IsClientAcknowledge || this.session.IsIndividualAcknowledge)
                 {
-                    bool messageAckedByConsumer = false;
-
-                    lock(this.dispatchedMessages)
-                    {
-                        messageAckedByConsumer = this.dispatchedMessages.Contains(dispatch);
-                    }
-
-                    if(messageAckedByConsumer)
-                    {
-                        AckLater(dispatch, AckType.DeliveredAck);
-                    }
+					// Do nothing.
                 }
                 else
                 {
@@ -676,7 +617,7 @@
             }
         }
 
-        private MessageAck MakeAckForAllDeliveredMessages(AckType type)
+        private MessageAck MakeAckForAllDeliveredMessages()
         {
             lock(this.dispatchedMessages)
             {
@@ -688,7 +629,7 @@
                 MessageDispatch dispatch = this.dispatchedMessages.First.Value;
                 MessageAck ack = new MessageAck();
 
-                ack.AckType = (byte) type;
+                ack.AckType = (byte) AckType.ConsumedAck;
                 ack.ConsumerId = this.info.ConsumerId;
                 ack.Destination = dispatch.Destination;
                 ack.LastMessageId = dispatch.Message.MessageId;
@@ -699,7 +640,7 @@
             }
         }
 
-        private void AckLater(MessageDispatch dispatch, AckType type)
+        private void AckLater(MessageDispatch dispatch)
         {
             // Don't acknowledge now, but we may need to let the broker know the
             // consumer got the message to expand the pre-fetch window
@@ -719,7 +660,7 @@
             MessageAck oldPendingAck = pendingAck;
 
             pendingAck = new MessageAck();
-            pendingAck.AckType = (byte) type;
+            pendingAck.AckType = (byte) AckType.ConsumedAck;
             pendingAck.ConsumerId = this.info.ConsumerId;
             pendingAck.Destination = dispatch.Destination;
             pendingAck.LastMessageId = dispatch.Message.MessageId;
@@ -734,31 +675,6 @@
             {
                 pendingAck.FirstMessageId = pendingAck.LastMessageId;
             }
-            else if(oldPendingAck.AckType == pendingAck.AckType)
-            {
-                pendingAck.FirstMessageId = oldPendingAck.FirstMessageId;
-            }
-            else
-            {
-                // old pending ack being superseded by ack of another type, if is is not a delivered
-                // ack and hence important, send it now so it is not lost.
-                if(oldPendingAck.AckType != (byte) AckType.DeliveredAck)
-                {
-                    if(Tracer.IsDebugEnabled)
-                    {
-                        Tracer.Debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
-                    }
-
-                    this.session.Connection.Oneway(oldPendingAck);
-                }
-                else
-                {
-                    if(Tracer.IsDebugEnabled)
-                    {
-                        Tracer.Debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
-                    }
-                }
-            }
 
             if((0.5 * this.info.PrefetchSize) <= (this.deliveredCounter - this.additionalWindowSize))
             {
@@ -774,7 +690,7 @@
             lock(this.dispatchedMessages)
             {
                 // Acknowledge all messages so far.
-                MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
+                MessageAck ack = MakeAckForAllDeliveredMessages();
 
                 if(ack == null)
                 {
@@ -828,7 +744,7 @@
 
                     redeliveryDelay = this.redeliveryPolicy.RedeliveryDelay(currentRedeliveryCount);
 
-                    MessageId firstMsgId = this.dispatchedMessages.Last.Value.Message.MessageId;
+//                    MessageId firstMsgId = this.dispatchedMessages.Last.Value.Message.MessageId;
 
                     foreach(MessageDispatch dispatch in this.dispatchedMessages)
                     {
@@ -839,39 +755,39 @@
                     if(this.redeliveryPolicy.MaximumRedeliveries >= 0 &&
                        lastMd.Message.RedeliveryCounter > this.redeliveryPolicy.MaximumRedeliveries)
                     {
-                        // We need to NACK the messages so that they get sent to the DLQ.
-                        MessageAck ack = new MessageAck();
-
-                        ack.AckType = (byte) AckType.PoisonAck;
-                        ack.ConsumerId = this.info.ConsumerId;
-                        ack.Destination = lastMd.Destination;
-                        ack.LastMessageId = lastMd.Message.MessageId;
-                        ack.MessageCount = this.dispatchedMessages.Count;
-                        ack.FirstMessageId = firstMsgId;
-
-                        this.session.Connection.Oneway(ack);
-
-                        // Adjust the window size.
-                        additionalWindowSize = Math.Max(0, this.additionalWindowSize - this.dispatchedMessages.Count);
+//                        // We need to NACK the messages so that they get sent to the DLQ.
+//                        MessageAck ack = new MessageAck();
+//
+//                        ack.AckType = (byte) AckType.PoisonAck;
+//                        ack.ConsumerId = this.info.ConsumerId;
+//                        ack.Destination = lastMd.Destination;
+//                        ack.LastMessageId = lastMd.Message.MessageId;
+//                        ack.MessageCount = this.dispatchedMessages.Count;
+//                        ack.FirstMessageId = firstMsgId;
+//
+//                        this.session.Connection.Oneway(ack);
+//
+//                        // Adjust the window size.
+//                        additionalWindowSize = Math.Max(0, this.additionalWindowSize - this.dispatchedMessages.Count);
 
                         this.redeliveryDelay = 0;
                     }
                     else
                     {
-                        // We only send a RedeliveryAck after the first redelivery
-                        if(currentRedeliveryCount > 0)
-                        {
-                            MessageAck ack = new MessageAck();
-
-                            ack.AckType = (byte) AckType.RedeliveredAck;
-                            ack.ConsumerId = this.info.ConsumerId;
-                            ack.Destination = lastMd.Destination;
-                            ack.LastMessageId = lastMd.Message.MessageId;
-                            ack.MessageCount = this.dispatchedMessages.Count;
-                            ack.FirstMessageId = firstMsgId;
-
-                            this.session.Connection.Oneway(ack);
-                        }
+//                        // We only send a RedeliveryAck after the first redelivery
+//                        if(currentRedeliveryCount > 0)
+//                        {
+//                            MessageAck ack = new MessageAck();
+//
+//                            ack.AckType = (byte) AckType.RedeliveredAck;
+//                            ack.ConsumerId = this.info.ConsumerId;
+//                            ack.Destination = lastMd.Destination;
+//                            ack.LastMessageId = lastMd.Message.MessageId;
+//                            ack.MessageCount = this.dispatchedMessages.Count;
+//                            ack.FirstMessageId = firstMsgId;
+//
+//                            this.session.Connection.Oneway(ack);
+//                        }
 
                         // stop the delivery of messages.
                         this.unconsumedMessages.Stop();
@@ -965,20 +881,6 @@
             }
         }
 
-        private bool IsAutoAcknowledgeEach
-        {
-            get
-            {
-                return this.session.IsAutoAcknowledge ||
-                       (this.session.IsDupsOkAcknowledge && this.info.Destination.IsQueue);
-            }
-        }
-
-        private bool IsAutoAcknowledgeBatch
-        {
-            get { return this.session.IsDupsOkAcknowledge && !this.info.Destination.IsQueue; }
-        }
-
         #region Nested ISyncronization Types
 
         class MessageConsumerSynchronization : ISynchronization