You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/07/07 22:50:18 UTC

svn commit: r961505 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp: Connection.cs MessageConsumer.cs Session.cs State/ConnectionState.cs State/ConnectionStateTracker.cs Transport/Failover/FailoverTransport.cs

Author: tabish
Date: Wed Jul  7 20:50:17 2010
New Revision: 961505

URL: http://svn.apache.org/viewvc?rev=961505&view=rev
Log:
Update the State Tracker and Failover Transport to allow the Consumers to control when their processing of connection interruption is complete.  Perform the Consumer clear operation Asynchronously to avoid lock contention.  

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=961505&r1=961504&r2=961505&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Wed Jul  7 20:50:17 2010
@@ -20,6 +20,7 @@ using System.Collections;
 using System.Threading;
 using Apache.NMS.ActiveMQ.Commands;
 using Apache.NMS.ActiveMQ.Transport;
+using Apache.NMS.ActiveMQ.Transport.Failover;
 using Apache.NMS.ActiveMQ.Util;
 using Apache.NMS.Util;
 
@@ -67,6 +68,7 @@ namespace Apache.NMS.ActiveMQ
         private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
         private ICompressionPolicy compressionPolicy = new CompressionPolicy();
         private IdGenerator clientIdGenerator;
+        private volatile CountDownLatch transportInterruptionProcessingComplete;
 
         public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
         {
@@ -641,6 +643,7 @@ namespace Apache.NMS.ActiveMQ
         {
             if(command is MessageDispatch)
             {
+                WaitForTransportInterruptionProcessingToComplete();
                 DispatchMessage((MessageDispatch) command);
             }
             else if(command is KeepAliveInfo)
@@ -767,6 +770,12 @@ namespace Apache.NMS.ActiveMQ
         {
             Tracer.Debug("Connection: Transport has been Interrupted.");
 
+            this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.Count);
+            if(Tracer.IsDebugEnabled)
+            {
+                Tracer.Debug("transport interrupted, dispatchers: " + dispatchers.Count);
+            }
+
             foreach(Session session in this.sessions)
             {
                 try
@@ -890,5 +899,62 @@ namespace Apache.NMS.ActiveMQ
             this.Oneway(command);
         }
 
+        private void WaitForTransportInterruptionProcessingToComplete()
+        {
+            CountDownLatch cdl = this.transportInterruptionProcessingComplete;
+            if(cdl != null)
+            {
+                if(!closed && cdl.Remaining > 0)
+                {
+                    Tracer.Warn("dispatch paused, waiting for outstanding dispatch interruption " +
+                                "processing (" + cdl.Remaining + ") to complete..");
+                    cdl.await(TimeSpan.FromSeconds(10));
+                }
+
+                SignalInterruptionProcessingComplete();
+            }
+        }
+
+        internal void TransportInterruptionProcessingComplete()
+        {
+            CountDownLatch cdl = this.transportInterruptionProcessingComplete;
+            if(cdl != null)
+            {
+                cdl.countDown();
+                try
+                {
+                    SignalInterruptionProcessingComplete();
+                }
+                catch
+                {
+                }
+            }
+        }
+    
+        private void SignalInterruptionProcessingComplete()
+        {
+            CountDownLatch cdl = this.transportInterruptionProcessingComplete;
+            if(cdl.Remaining == 0)
+            {
+                if(Tracer.IsDebugEnabled)
+                {
+                    Tracer.Debug("transportInterruptionProcessingComplete for: " + this.info.ConnectionId);
+                }
+                this.transportInterruptionProcessingComplete = null;
+
+                FailoverTransport failoverTransport = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
+                if(failoverTransport != null)
+                {
+                    failoverTransport.ConnectionInterruptProcessingComplete(this.info.ConnectionId);
+                    if(Tracer.IsDebugEnabled)
+                    {
+                        Tracer.Debug("notified failover transport (" + failoverTransport +
+                                     ") of interruption completion for: " + this.info.ConnectionId);
+                    }
+                }
+    
+            }
+        }
+
     }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=961505&r1=961504&r2=961505&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Wed Jul  7 20:50:17 2010
@@ -57,6 +57,7 @@ namespace Apache.NMS.ActiveMQ
 		private int dispatchedCount = 0;
 		private volatile bool synchronizationRegistered = false;
 		private bool clearDispatchList = false;
+        private bool inProgressClearRequiredFlag;
 
 		private const int DEFAULT_REDELIVERY_DELAY = 0;
 		private const int DEFAULT_MAX_REDELIVERIES = 5;
@@ -421,17 +422,38 @@ namespace Apache.NMS.ActiveMQ
 			this.unconsumedMessages.Stop();
 		}
 
-		public void ClearMessagesInProgress()
-		{
-			// we are called from inside the transport reconnection logic
-			// which involves us clearing all the connections' consumers
-			// dispatch lists and clearing them
-			// so rather than trying to grab a mutex (which could be already
-			// owned by the message listener calling the send) we will just set
-			// a flag so that the list can be cleared as soon as the
-			// dispatch thread is ready to flush the dispatch list
-			this.clearDispatchList = true;
-		}
+        internal void InProgressClearRequired()
+        {
+            inProgressClearRequiredFlag = true;
+            // deal with delivered messages async to avoid lock contention with in progress acks
+            clearDispatchList = true;
+        }
+
+        internal void ClearMessagesInProgress()
+        {
+            if(inProgressClearRequiredFlag)
+            {
+                // Called from a thread in the ThreadPool, so we wait until we can
+                // get a lock on the unconsumed list then we clear it.
+                lock(this.unconsumedMessages)
+                {
+                    if(inProgressClearRequiredFlag)
+                    {
+                        if(Tracer.IsDebugEnabled)
+                        {
+                            Tracer.Debug(this.ConsumerId + " clearing dispatched list (" +
+                                         this.unconsumedMessages.Count + ") on transport interrupt");
+                        }
+
+                        this.unconsumedMessages.Clear();
+
+                        // allow dispatch on this connection to resume
+                        this.session.Connection.TransportInterruptionProcessingComplete();
+                        this.inProgressClearRequiredFlag = false;
+                    }
+                }
+            }
+        }
 
 		public void DeliverAcks()
 		{

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=961505&r1=961504&r2=961505&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Wed Jul  7 20:50:17 2010
@@ -793,15 +793,32 @@ namespace Apache.NMS.ActiveMQ
                 this.executor.ClearMessagesInProgress();
             }
 
+            // Because we are called from inside the Transport Reconnection logic
+            // we spawn the Consumer clear to another Thread so that we can avoid
+            // any lock contention that might exist between the consumer and the
+            // connection that is reconnecting.
             lock(this.consumers.SyncRoot)
             {
                 foreach(MessageConsumer consumer in this.consumers.Values)
                 {
-                    consumer.ClearMessagesInProgress();
+                    consumer.InProgressClearRequired();
+                    ThreadPool.QueueUserWorkItem(ClearMessages, consumer);
                 }
             }
         }
 
+        private void ClearMessages(object value)
+        {
+            MessageConsumer consumer = value as MessageConsumer;
+
+            if(Tracer.IsDebugEnabled)
+            {
+                Tracer.Debug("Performing Async Clear of In Progress Messages on Consumer: " + consumer.ConsumerId);
+            }
+
+            consumer.ClearMessagesInProgress();
+        }
+
         internal void Acknowledge()
         {
             lock(this.consumers.SyncRoot)

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs?rev=961505&r1=961504&r2=961505&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs Wed Jul  7 20:50:17 2010
@@ -16,6 +16,7 @@
  */
 
 using System;
+using System.Collections.Generic;
 using Apache.NMS.ActiveMQ.Commands;
 using Apache.NMS.Util;
 
@@ -29,6 +30,9 @@ namespace Apache.NMS.ActiveMQ.State
 		private AtomicDictionary<SessionId, SessionState> sessions = new AtomicDictionary<SessionId, SessionState>();
 		private AtomicCollection<DestinationInfo> tempDestinations = new AtomicCollection<DestinationInfo>();
 		private Atomic<bool> _shutdown = new Atomic<bool>(false);
+	    private bool connectionInterruptProcessingComplete = true;
+		private Dictionary<ConsumerId, ConsumerInfo> recoveringPullConsumers = 
+			new Dictionary<ConsumerId, ConsumerInfo>();
 
 		public ConnectionState(ConnectionInfo info)
 		{
@@ -75,21 +79,6 @@ namespace Apache.NMS.ActiveMQ.State
 			transactions.Add(id, new TransactionState(id));
 		}
 
-		/*
-		public TransactionState getTransactionState(TransactionId id) {
-			return transactions[id];
-		}
-
-		public SynchronizedCollection<TransactionState> getTransactionStates() {
-			return transactions.Values;
-		}
-
-		public SessionState getSessionState(SessionId id) {
-			return sessions[id];
-		}
-
-		*/
-
 		public TransactionState this[TransactionId id]
 		{
 			get
@@ -192,6 +181,17 @@ namespace Apache.NMS.ActiveMQ.State
 				throw new ApplicationException("Disposed");
 			}
 		}
+		
+		public Dictionary<ConsumerId, ConsumerInfo> RecoveringPullConsumers
+		{
+			get { return this.recoveringPullConsumers; }
+		}
+		
+		public bool ConnectionInterruptProcessingComplete
+		{
+			get { return this.connectionInterruptProcessingComplete; }
+			set { this.connectionInterruptProcessingComplete = value; }
+		}
 
 		public void shutdown()
 		{

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs?rev=961505&r1=961504&r2=961505&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs Wed Jul  7 20:50:17 2010
@@ -18,6 +18,7 @@
 using System;
 using System.Collections.Generic;
 
+using Apache.NMS.Util;
 using Apache.NMS.ActiveMQ.Commands;
 using Apache.NMS.ActiveMQ.Transport;
 
@@ -175,10 +176,37 @@ namespace Apache.NMS.ActiveMQ.State
 		/// <param name="sessionState"></param>
 		protected void DoRestoreConsumers(ITransport transport, SessionState sessionState)
 		{
+            // Restore the session's consumers but possibly in pull only (prefetch 0 state) till
+            // recovery completes.
+
+	        ConnectionState connectionState = connectionStates[sessionState.Info.SessionId.ParentId];
+			bool connectionInterruptionProcessingComplete =
+                connectionState.ConnectionInterruptProcessingComplete;
+
 			// Restore the session's consumers
 			foreach(ConsumerState consumerState in sessionState.ConsumerStates)
 			{
-				transport.Oneway(consumerState.Info);
+                ConsumerInfo infoToSend = consumerState.Info;
+
+                if(!connectionInterruptionProcessingComplete && infoToSend.PrefetchSize > 0)
+                {
+                    infoToSend = consumerState.Info.Clone() as ConsumerInfo;
+                    connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, consumerState.Info);
+                    infoToSend.PrefetchSize = 0;
+                    if(Tracer.IsDebugEnabled)
+                    {
+                        Tracer.Debug("restore consumer: " + infoToSend.ConsumerId +
+                                     " in pull mode pending recovery, overriding prefetch: " +
+                                     consumerState.Info.PrefetchSize);
+                    }
+                }
+
+                if(Tracer.IsDebugEnabled)
+                {
+                    Tracer.Debug("restore consumer: " + infoToSend.ConsumerId);
+                }
+
+                transport.Oneway(infoToSend);
 			}
 		}
 
@@ -189,7 +217,6 @@ namespace Apache.NMS.ActiveMQ.State
 		protected void DoRestoreProducers(ITransport transport, SessionState sessionState)
 		{
 			// Restore the session's producers
-
 			foreach(ProducerState producerState in sessionState.ProducerStates)
 			{
 				transport.Oneway(producerState.Info);
@@ -652,5 +679,49 @@ namespace Apache.NMS.ActiveMQ.State
 				_maxCacheSize = value;
 			}
 		}
+
+	    public void ConnectionInterruptProcessingComplete(ITransport transport, ConnectionId connectionId) 
+		{
+	        ConnectionState connectionState = connectionStates[connectionId];
+	        if(connectionState != null) 
+			{
+	            connectionState.ConnectionInterruptProcessingComplete = true;
+	            
+				Dictionary<ConsumerId, ConsumerInfo> stalledConsumers = connectionState.RecoveringPullConsumers;
+				foreach(KeyValuePair<ConsumerId, ConsumerInfo> entry in stalledConsumers)
+				{
+	                ConsumerControl control = new ConsumerControl();
+	                control.ConsumerId = entry.Key;
+	                control.Prefetch = entry.Value.PrefetchSize;
+	                control.Destination = entry.Value.Destination;
+	                try 
+					{
+	                    if(Tracer.IsDebugEnabled) 
+						{
+	                        Tracer.Debug("restored recovering consumer: " + control.ConsumerId + 
+							             " with: " + control.Prefetch);
+	                    }
+	                    transport.Oneway(control);
+	                } 
+					catch(Exception ex)
+					{
+	                    if(Tracer.IsDebugEnabled) 
+						{
+	                        Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId +
+	                                     " with: " + control.Prefetch + "Error: " + ex.Message);
+	                    }
+	                }
+	            }
+	            stalledConsumers.Clear();
+	        }
+	    }
+		
+	    public void TransportInterrupted() 
+		{
+	        foreach(ConnectionState connectionState in connectionStates.Values) 
+			{
+	            connectionState.ConnectionInterruptProcessingComplete = false;
+	        }
+	    }		
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=961505&r1=961504&r2=961505&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs Wed Jul  7 20:50:17 2010
@@ -370,16 +370,19 @@ namespace Apache.NMS.ActiveMQ.Transport.
                     failedConnectTransportURI = ConnectedTransportURI;
                     ConnectedTransportURI = null;
                     connected = false;
+					
+					stateTracker.TransportInterrupted();
+					
+	                if(this.Interrupted != null)
+	                {
+	                    this.Interrupted(transport);
+	                }
+					
                     if(reconnectOk)
                     {
                         reconnectTask.Wakeup();
                     }
-                }
-
-                if(this.Interrupted != null)
-                {
-                    this.Interrupted(transport);
-                }
+				}
             }
         }
 
@@ -1155,7 +1158,16 @@ namespace Apache.NMS.ActiveMQ.Transport.
 
             return false;
         }
-        
+
+        public void ConnectionInterruptProcessingComplete(ConnectionId connectionId)
+        {
+            lock(reconnectMutex)
+            {
+                Tracer.Debug("Connection Interrupt Processing is complete for ConnectionId: " + connectionId);
+                stateTracker.ConnectionInterruptProcessingComplete(this, connectionId);
+            }
+        }
+
         public void Dispose()
         {
             Dispose(true);