You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/07/08 17:27:16 UTC

svn commit: r961805 - in /activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp: ./ Commands/ State/

Author: tabish
Date: Thu Jul  8 15:27:15 2010
New Revision: 961805

URL: http://svn.apache.org/viewvc?rev=961805&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQNET-254

Adds more failover handling, basic failover is working now.

Modified:
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseMessage.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConnectionError.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConnectionInfo.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConsumerInfo.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/MessageAck.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/MessageDispatch.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ProducerInfo.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/RemoveInfo.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/RemoveSubscriptionInfo.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/SessionInfo.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/SubscriptionInfo.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/TransactionInfo.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/State/CommandVisitorAdapter.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/State/ICommandVisitor.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/TransactionContext.cs

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseMessage.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseMessage.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseMessage.cs Thu Jul  8 15:27:15 2010
@@ -19,6 +19,7 @@ using System;
 using System.Collections;
 
 using Apache.NMS.Util;
+using Apache.NMS.Stomp.State;
 
 namespace Apache.NMS.Stomp.Commands
 {
@@ -264,6 +265,11 @@ namespace Apache.NMS.Stomp.Commands
             }
         }
 
+        public override Response visit(ICommandVisitor visitor)
+        {
+            return visitor.processMessage( this );
+        }
+
     };
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConnectionError.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConnectionError.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConnectionError.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConnectionError.cs Thu Jul  8 15:27:15 2010
@@ -17,6 +17,7 @@
 
 using System;
 using System.Collections;
+using Apache.NMS.Stomp.State;
 
 namespace Apache.NMS.Stomp.Commands
 {
@@ -75,6 +76,11 @@ namespace Apache.NMS.Stomp.Commands
             }
         }
 
+        public override Response visit(ICommandVisitor visitor)
+        {
+            return visitor.processConnectionError( this );
+        }
+
     };
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConnectionInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConnectionInfo.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConnectionInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConnectionInfo.cs Thu Jul  8 15:27:15 2010
@@ -17,6 +17,7 @@
 
 using System;
 using System.Collections;
+using Apache.NMS.Stomp.State;
 
 namespace Apache.NMS.Stomp.Commands
 {
@@ -91,6 +92,11 @@ namespace Apache.NMS.Stomp.Commands
             }
         }
 
+        public override Response visit(ICommandVisitor visitor)
+        {
+            return visitor.processAddConnection( this );
+        }
+
     };
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConsumerInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConsumerInfo.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConsumerInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConsumerInfo.cs Thu Jul  8 15:27:15 2010
@@ -19,6 +19,7 @@ using System;
 using System.Collections;
 
 using Apache.NMS;
+using Apache.NMS.Stomp.State;
 
 namespace Apache.NMS.Stomp.Commands
 {
@@ -167,6 +168,11 @@ namespace Apache.NMS.Stomp.Commands
             }
         }
 
+        public override Response visit(ICommandVisitor visitor)
+        {
+            return visitor.processAddConsumer( this );
+        }
+
     };
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/MessageAck.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/MessageAck.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/MessageAck.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/MessageAck.cs Thu Jul  8 15:27:15 2010
@@ -18,6 +18,8 @@
 using System;
 using System.Collections;
 
+using Apache.NMS.Stomp.State;
+
 namespace Apache.NMS.Stomp.Commands
 {
     public class MessageAck : BaseCommand
@@ -115,6 +117,11 @@ namespace Apache.NMS.Stomp.Commands
             }
         }
 
+        public override Response visit(ICommandVisitor visitor)
+        {
+            return visitor.processMessageAck( this );
+        }
+
     };
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/MessageDispatch.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/MessageDispatch.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/MessageDispatch.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/MessageDispatch.cs Thu Jul  8 15:27:15 2010
@@ -18,6 +18,8 @@
 using System;
 using System.Collections;
 
+using Apache.NMS.Stomp.State;
+
 namespace Apache.NMS.Stomp.Commands
 {
     public class MessageDispatch : BaseCommand
@@ -133,6 +135,11 @@ namespace Apache.NMS.Stomp.Commands
             }
         }
 
+        public override Response visit(ICommandVisitor visitor)
+        {
+            return visitor.processMessageDispatch( this );
+        }
+
     };
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ProducerInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ProducerInfo.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ProducerInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ProducerInfo.cs Thu Jul  8 15:27:15 2010
@@ -18,6 +18,8 @@
 using System;
 using System.Collections;
 
+using Apache.NMS.Stomp.State;
+
 namespace Apache.NMS.Stomp.Commands
 {
     public class ProducerInfo : BaseCommand
@@ -83,6 +85,11 @@ namespace Apache.NMS.Stomp.Commands
             }
         }
 
+        public override Response visit(ICommandVisitor visitor)
+        {
+            return visitor.processAddProducer( this );
+        }
+
     };
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/RemoveInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/RemoveInfo.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/RemoveInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/RemoveInfo.cs Thu Jul  8 15:27:15 2010
@@ -17,6 +17,7 @@
 
 using System;
 using System.Collections;
+using Apache.NMS.Stomp.State;
 
 namespace Apache.NMS.Stomp.Commands
 {
@@ -61,6 +62,29 @@ namespace Apache.NMS.Stomp.Commands
             }
         }
 
+        ///
+        /// <summery>
+        ///  Allows a Visitor to visit this command and return a response to the
+        ///  command based on the command type being visited.  The command will call
+        ///  the proper processXXX method in the visitor.
+        /// </summery>
+        ///
+        public override Response visit(ICommandVisitor visitor)
+        {
+            switch(objectId.GetDataStructureType())
+            {
+                case DataStructureTypes.ConnectionIdType:
+                    return visitor.processRemoveConnection((ConnectionId) objectId);
+                case DataStructureTypes.SessionIdType:
+                    return visitor.processRemoveSession((SessionId) objectId);
+                case DataStructureTypes.ConsumerIdType:
+                    return visitor.processRemoveConsumer((ConsumerId) objectId);
+                case DataStructureTypes.ProducerIdType:
+                    return visitor.processRemoveProducer((ProducerId) objectId);
+                default:
+                    throw new IOException("Unknown remove command type: " + objectId.GetDataStructureType());
+            }
+        }
     };
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/RemoveSubscriptionInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/RemoveSubscriptionInfo.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/RemoveSubscriptionInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/RemoveSubscriptionInfo.cs Thu Jul  8 15:27:15 2010
@@ -17,6 +17,7 @@
 
 using System;
 using System.Collections;
+using Apache.NMS.Stomp.State;
 
 namespace Apache.NMS.Stomp.Commands
 {
@@ -92,6 +93,11 @@ namespace Apache.NMS.Stomp.Commands
             }
         }
 
+        public override Response visit(ICommandVisitor visitor)
+        {
+            return visitor.processRemoveSubscriptionInfo( this );
+        }
+
     };
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/SessionInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/SessionInfo.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/SessionInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/SessionInfo.cs Thu Jul  8 15:27:15 2010
@@ -18,6 +18,8 @@
 using System;
 using System.Collections;
 
+using Apache.NMS.Stomp.State;
+
 namespace Apache.NMS.Stomp.Commands
 {
     public class SessionInfo : BaseCommand
@@ -74,6 +76,11 @@ namespace Apache.NMS.Stomp.Commands
             }
         }
 
+        public override Response visit(ICommandVisitor visitor)
+        {
+            return visitor.processAddSession( this );
+        }
+
     };
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/SubscriptionInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/SubscriptionInfo.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/SubscriptionInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/SubscriptionInfo.cs Thu Jul  8 15:27:15 2010
@@ -18,6 +18,8 @@
 using System;
 using System.Collections;
 
+using Apache.NMS.Stomp.State;
+
 namespace Apache.NMS.Stomp.Commands
 {
     public class SubscriptionInfo : BaseDataStructure

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/TransactionInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/TransactionInfo.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/TransactionInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/TransactionInfo.cs Thu Jul  8 15:27:15 2010
@@ -18,6 +18,8 @@
 using System;
 using System.Collections;
 
+using Apache.NMS.Stomp.State;
+
 namespace Apache.NMS.Stomp.Commands
 {
     public class TransactionInfo : BaseCommand
@@ -87,6 +89,20 @@ namespace Apache.NMS.Stomp.Commands
             }
         }
 
+        public override Response visit(ICommandVisitor visitor)
+        {
+            switch(type)
+            {
+                case TransactionInfo.BEGIN:
+                    return visitor.processBeginTransaction(this);
+                case TransactionInfo.COMMIT:
+                    return visitor.processCommitTransaction(this);
+                case TransactionInfo.ROLLBACK:
+                    return visitor.processRollbackTransaction(this);
+                default:
+                    throw new IOException("Transaction info type unknown: " + type);
+            }
+        }
     };
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs Thu Jul  8 15:27:15 2010
@@ -20,6 +20,7 @@ using System.Collections;
 using System.Threading;
 using Apache.NMS.Stomp.Commands;
 using Apache.NMS.Stomp.Transport;
+using Apache.NMS.Stomp.Transport.Failover;
 using Apache.NMS.Stomp.Util;
 using Apache.NMS.Util;
 
@@ -61,6 +62,7 @@ namespace Apache.NMS.Stomp
 		private ConnectionMetaData metaData = null;
 		private bool disposed = false;
 		private IdGenerator clientIdGenerator;
+        private CountDownLatch transportInterruptionProcessingComplete;
 
 		public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
 		{
@@ -532,6 +534,9 @@ namespace Apache.NMS.Stomp
 		{
 			if(command is MessageDispatch)
 			{
+                // We wait if the Connection is still processing interruption
+                // code to reset the MessageConsumers.
+                WaitForTransportInterruptionProcessingToComplete();
 				DispatchMessage((MessageDispatch) command);
 			}
 			else if(command is ConnectionError)
@@ -606,6 +611,12 @@ namespace Apache.NMS.Stomp
 		{
 			Tracer.Debug("Transport has been Interrupted.");
 
+            this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.Count);
+            if(Tracer.IsDebugEnabled)
+            {
+                Tracer.Debug("transport interrupted, dispatchers: " + dispatchers.Count);
+            }
+
 			foreach(Session session in this.sessions)
 			{
 				session.ClearMessagesInProgress();
@@ -682,5 +693,28 @@ namespace Apache.NMS.Stomp
 			answer.SessionId = sessionId;
 			return answer;
 		}
+
+        private void WaitForTransportInterruptionProcessingToComplete()
+        {
+            CountDownLatch cdl = this.transportInterruptionProcessingComplete;
+            if(cdl != null)
+            {
+                if(!closed && cdl.Remaining > 0)
+                {
+                    Tracer.Warn("dispatch paused, waiting for outstanding dispatch interruption " +
+                                "processing (" + cdl.Remaining + ") to complete..");
+                    cdl.await(TimeSpan.FromSeconds(10));
+                }
+            }
+        }
+
+        internal void TransportInterruptionProcessingComplete()
+        {
+            CountDownLatch cdl = this.transportInterruptionProcessingComplete;
+            if(cdl != null)
+            {
+                cdl.countDown();
+            }
+        }
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs Thu Jul  8 15:27:15 2010
@@ -53,6 +53,7 @@ namespace Apache.NMS.Stomp
         private int dispatchedCount = 0;
         private volatile bool synchronizationRegistered = false;
         private bool clearDispatchList = false;
+        private bool inProgressClearRequiredFlag;
 
         private event MessageListener listener;
 
@@ -314,16 +315,38 @@ namespace Apache.NMS.Stomp
             this.unconsumedMessages.Stop();
         }
 
-        public void ClearMessagesInProgress()
+        internal void InProgressClearRequired()
         {
-            // we are called from inside the transport reconnection logic
-            // which involves us clearing all the connections' consumers
-            // dispatch lists and clearing them
-            // so rather than trying to grab a mutex (which could be already
-            // owned by the message listener calling the send) we will just set
-            // a flag so that the list can be cleared as soon as the
-            // dispatch thread is ready to flush the dispatch list
-            this.clearDispatchList = true;
+            inProgressClearRequiredFlag = true;
+            // deal with delivered messages async to avoid lock contention with in progress acks
+            clearDispatchList = true;
+        }
+
+        internal void ClearMessagesInProgress()
+        {
+            if(inProgressClearRequiredFlag)
+            {
+                // Called from a thread in the ThreadPool, so we wait until we can
+                // get a lock on the unconsumed list then we clear it.
+                lock(this.unconsumedMessages)
+                {
+                    if(inProgressClearRequiredFlag)
+                    {
+                        if(Tracer.IsDebugEnabled)
+                        {
+                            Tracer.Debug(this.ConsumerId + " clearing dispatched list (" +
+                                         this.unconsumedMessages.Count + ") on transport interrupt");
+                        }
+
+                        this.unconsumedMessages.Clear();
+                        this.synchronizationRegistered = false;
+
+                        // allow dispatch on this connection to resume
+                        this.session.Connection.TransportInterruptionProcessingComplete();
+                        this.inProgressClearRequiredFlag = false;
+                    }
+                }
+            }
         }
 
         public void DeliverAcks()
@@ -744,8 +767,6 @@ namespace Apache.NMS.Stomp
 
                     redeliveryDelay = this.redeliveryPolicy.RedeliveryDelay(currentRedeliveryCount);
 
-//                    MessageId firstMsgId = this.dispatchedMessages.Last.Value.Message.MessageId;
-
                     foreach(MessageDispatch dispatch in this.dispatchedMessages)
                     {
                         // Allow the message to update its internal to reflect a Rollback.

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs Thu Jul  8 15:27:15 2010
@@ -75,7 +75,7 @@ namespace Apache.NMS.Stomp
 
             this.executor = new SessionExecutor(this, this.consumers);
         }
-        
+
         ~Session()
         {
             Dispose(false);
@@ -757,15 +757,33 @@ namespace Apache.NMS.Stomp
                 this.executor.ClearMessagesInProgress();
             }
 
+            if(Transacted)
+            {
+                this.transactionContext.ResetTransactionInProgress();
+            }
+
             lock(this.consumers.SyncRoot)
             {
                 foreach(MessageConsumer consumer in this.consumers)
                 {
-                    consumer.ClearMessagesInProgress();
+                    consumer.InProgressClearRequired();
+                    ThreadPool.QueueUserWorkItem(ClearMessages, consumer);
                 }
             }
         }
 
+        private void ClearMessages(object value)
+        {
+            MessageConsumer consumer = value as MessageConsumer;
+
+            if(Tracer.IsDebugEnabled)
+            {
+                Tracer.Debug("Performing Async Clear of In Progress Messages for Consumer: " + consumer.ConsumerId);
+            }
+
+            consumer.ClearMessagesInProgress();
+        }
+
         internal void Acknowledge()
         {
             lock(this.consumers.SyncRoot)

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/State/CommandVisitorAdapter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/State/CommandVisitorAdapter.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/State/CommandVisitorAdapter.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/State/CommandVisitorAdapter.cs Thu Jul  8 15:27:15 2010
@@ -26,17 +26,27 @@ namespace Apache.NMS.Stomp.State
             return null;
         }
 
+        public virtual Response processAddSession(SessionInfo info)
+        {
+            return null;
+        }
+
         public virtual Response processAddConsumer(ConsumerInfo info)
         {
             return null;
         }
 
+        public virtual Response processAddProducer(ProducerInfo info)
+        {
+            return null;
+        }
+
         public virtual Response processKeepAliveInfo(KeepAliveInfo info)
         {
             return null;
         }
 
-        public virtual Response processMessage(Message send)
+        public virtual Response processMessage(BaseMessage send)
         {
             return null;
         }
@@ -51,11 +61,21 @@ namespace Apache.NMS.Stomp.State
             return null;
         }
 
+        public virtual Response processRemoveSession(SessionId id)
+        {
+            return null;
+        }
+
         public virtual Response processRemoveConsumer(ConsumerId id)
         {
             return null;
         }
 
+        public virtual Response processRemoveProducer(ProducerId id)
+        {
+            return null;
+        }
+
         public virtual Response processRemoveSubscriptionInfo(RemoveSubscriptionInfo info)
         {
             return null;
@@ -81,5 +101,20 @@ namespace Apache.NMS.Stomp.State
             return null;
         }
 
+        public virtual Response processBeginTransaction(TransactionInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processCommitTransaction(TransactionInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processRollbackTransaction(TransactionInfo info)
+        {
+            return null;
+        }
+
     }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/State/ICommandVisitor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/State/ICommandVisitor.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/State/ICommandVisitor.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/State/ICommandVisitor.cs Thu Jul  8 15:27:15 2010
@@ -24,15 +24,23 @@ namespace Apache.NMS.Stomp.State
     {
         Response processAddConnection(ConnectionInfo info);
 
+        Response processAddSession(SessionInfo info);
+
         Response processAddConsumer(ConsumerInfo info);
 
+        Response processAddProducer(ProducerInfo info);
+
         Response processRemoveConnection(ConnectionId id);
 
+        Response processRemoveSession(SessionId id);
+
         Response processRemoveConsumer(ConsumerId id);
 
+        Response processRemoveProducer(ProducerId id);
+
         Response processRemoveSubscriptionInfo(RemoveSubscriptionInfo info);
 
-        Response processMessage(Message send);
+        Response processMessage(BaseMessage send);
 
         Response processMessageAck(MessageAck ack);
 
@@ -45,5 +53,12 @@ namespace Apache.NMS.Stomp.State
         Response processConnectionError(ConnectionError error);
 
         Response processResponse(Response response);
+
+        Response processBeginTransaction(TransactionInfo info);
+
+        Response processCommitTransaction(TransactionInfo info);
+
+        Response processRollbackTransaction(TransactionInfo info);
+
     }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/TransactionContext.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/TransactionContext.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/TransactionContext.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/TransactionContext.cs Thu Jul  8 15:27:15 2010
@@ -62,6 +62,15 @@ namespace Apache.NMS.Stomp
             synchronizations.Remove(synchronization);
         }
 
+        public void ResetTransactionInProgress()
+        {
+            if(InTransaction)
+            {
+                this.transactionId = null;
+                this.synchronizations.Clear();
+            }
+        }
+
         public void Begin()
         {
             if(!InTransaction)