You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/07/09 01:09:23 UTC

svn commit: r961976 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp: Commands/ConnectionInfo.cs Connection.cs OpenWire/V6/ConnectionInfoMarshaller.cs State/ConnectionStateTracker.cs

Author: tabish
Date: Thu Jul  8 23:09:22 2010
New Revision: 961976

URL: http://svn.apache.org/viewvc?rev=961976&view=rev
Log:
Update Commands and set Properties correctly.

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ConnectionInfo.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V6/ConnectionInfoMarshaller.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ConnectionInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ConnectionInfo.cs?rev=961976&r1=961975&r2=961976&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ConnectionInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ConnectionInfo.cs Thu Jul  8 23:09:22 2010
@@ -44,6 +44,7 @@ namespace Apache.NMS.ActiveMQ.Commands
         bool manageable;
         bool clientMaster;
         bool faultTolerant;
+        bool failoverReconnect;
 
         ///
         /// <summery>
@@ -75,7 +76,8 @@ namespace Apache.NMS.ActiveMQ.Commands
                 "BrokerMasterConnector = " + BrokerMasterConnector + ", " + 
                 "Manageable = " + Manageable + ", " + 
                 "ClientMaster = " + ClientMaster + ", " + 
-                "FaultTolerant = " + FaultTolerant + " ]";
+                "FaultTolerant = " + FaultTolerant + ", " + 
+                "FailoverReconnect = " + FailoverReconnect + " ]";
         }
 
         public ConnectionId ConnectionId
@@ -132,6 +134,12 @@ namespace Apache.NMS.ActiveMQ.Commands
             set { this.faultTolerant = value; }
         }
 
+        public bool FailoverReconnect
+        {
+            get { return failoverReconnect; }
+            set { this.failoverReconnect = value; }
+        }
+
         ///
         /// <summery>
         ///  Return an answer of true to the isConnectionInfo() query.

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=961976&r1=961975&r2=961976&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Thu Jul  8 23:09:22 2010
@@ -87,6 +87,7 @@ namespace Apache.NMS.ActiveMQ
 
             this.info = new ConnectionInfo();
             this.info.ConnectionId = id;
+            this.info.FaultTolerant = transport.IsFaultTolerant;
         }
 
         ~Connection()
@@ -930,7 +931,7 @@ namespace Apache.NMS.ActiveMQ
                 }
             }
         }
-    
+
         private void SignalInterruptionProcessingComplete()
         {
             CountDownLatch cdl = this.transportInterruptionProcessingComplete;
@@ -952,7 +953,7 @@ namespace Apache.NMS.ActiveMQ
                                      ") of interruption completion for: " + this.info.ConnectionId);
                     }
                 }
-    
+
             }
         }
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V6/ConnectionInfoMarshaller.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V6/ConnectionInfoMarshaller.cs?rev=961976&r1=961975&r2=961976&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V6/ConnectionInfoMarshaller.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V6/ConnectionInfoMarshaller.cs Thu Jul  8 23:09:22 2010
@@ -84,6 +84,7 @@ namespace Apache.NMS.ActiveMQ.OpenWire.V
             info.Manageable = bs.ReadBoolean();
             info.ClientMaster = bs.ReadBoolean();
             info.FaultTolerant = bs.ReadBoolean();
+            info.FailoverReconnect = bs.ReadBoolean();
         }
 
         //
@@ -103,6 +104,7 @@ namespace Apache.NMS.ActiveMQ.OpenWire.V
             bs.WriteBoolean(info.Manageable);
             bs.WriteBoolean(info.ClientMaster);
             bs.WriteBoolean(info.FaultTolerant);
+            bs.WriteBoolean(info.FailoverReconnect);
 
             return rc + 0;
         }
@@ -124,6 +126,7 @@ namespace Apache.NMS.ActiveMQ.OpenWire.V
             bs.ReadBoolean();
             bs.ReadBoolean();
             bs.ReadBoolean();
+            bs.ReadBoolean();
         }
 
         // 
@@ -154,6 +157,7 @@ namespace Apache.NMS.ActiveMQ.OpenWire.V
             info.Manageable = dataIn.ReadBoolean();
             info.ClientMaster = dataIn.ReadBoolean();
             info.FaultTolerant = dataIn.ReadBoolean();
+            info.FailoverReconnect = dataIn.ReadBoolean();
         }
 
         // 
@@ -174,6 +178,7 @@ namespace Apache.NMS.ActiveMQ.OpenWire.V
             dataOut.Write(info.Manageable);
             dataOut.Write(info.ClientMaster);
             dataOut.Write(info.FaultTolerant);
+            dataOut.Write(info.FailoverReconnect);
         }
     }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs?rev=961976&r1=961975&r2=961976&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs Thu Jul  8 23:09:22 2010
@@ -24,168 +24,171 @@ using Apache.NMS.ActiveMQ.Transport;
 
 namespace Apache.NMS.ActiveMQ.State
 {
-	/// <summary>
-	/// Tracks the state of a connection so a newly established transport can be
-	/// re-initialized to the state that was tracked.
-	/// </summary>
-	public class ConnectionStateTracker : CommandVisitorAdapter
-	{
-
-		private static Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
-
-		protected Dictionary<ConnectionId, ConnectionState> connectionStates = new Dictionary<ConnectionId, ConnectionState>();
-
-		private bool _trackTransactions;
-		private bool _restoreSessions = true;
-		private bool _restoreConsumers = true;
-		private bool _restoreProducers = true;
-		private bool _restoreTransaction = true;
-		private bool _trackMessages = true;
-		private int _maxCacheSize = 256;
-		private int currentCacheSize;
-		private Dictionary<MessageId, Message> messageCache = new Dictionary<MessageId, Message>();
-		private Queue<MessageId> messageCacheFIFO = new Queue<MessageId>();
-
-		protected void RemoveEldestInCache()
-		{
-			System.Collections.ICollection ic = messageCacheFIFO;
-			lock(ic.SyncRoot)
-			{
-				while(messageCacheFIFO.Count > MaxCacheSize)
-				{
-					messageCache.Remove(messageCacheFIFO.Dequeue());
-					currentCacheSize = currentCacheSize - 1;
-				}
-			}
-		}
-
-		private class RemoveTransactionAction : ThreadSimulator
-		{
-			private TransactionInfo info;
-			private ConnectionStateTracker cst;
-
-			public RemoveTransactionAction(TransactionInfo info, ConnectionStateTracker aCst)
-			{
-				this.info = info;
-				this.cst = aCst;
-			}
-
-			public override void Run()
-			{
-				ConnectionId connectionId = info.ConnectionId;
-				ConnectionState cs = cst.connectionStates[connectionId];
-				cs.removeTransactionState(info.TransactionId);
-			}
-		}
-
-		/// <summary>
-		/// </summary>
-		/// <param name="command"></param>
-		/// <returns>null if the command is not state tracked.</returns>
-		public Tracked track(Command command)
-		{
-			try
-			{
-				return (Tracked) command.visit(this);
-			}
-			catch(IOException e)
-			{
-				throw e;
-			}
-			catch(Exception e)
-			{
-				throw new IOException(e.Message);
-			}
-		}
-
-		public void trackBack(Command command)
-		{
-			if(TrackMessages && command != null && command.IsMessage)
-			{
-				Message message = (Message) command;
-				if(message.TransactionId == null)
-				{
-					currentCacheSize = currentCacheSize + 1;
-				}
-			}
-		}
-
-		public void DoRestore(ITransport transport)
-		{
-			// Restore the connections.
-			foreach(ConnectionState connectionState in connectionStates.Values)
-			{
-				transport.Oneway(connectionState.Info);
-				DoRestoreTempDestinations(transport, connectionState);
-
-				if(RestoreSessions)
-				{
-					DoRestoreSessions(transport, connectionState);
-				}
-
-				if(RestoreTransaction)
-				{
-					DoRestoreTransactions(transport, connectionState);
-				}
-			}
-			//now flush messages
-			foreach(Message msg in messageCache.Values)
-			{
-				transport.Oneway(msg);
-			}
-		}
-
-		private void DoRestoreTransactions(ITransport transport, ConnectionState connectionState)
-		{
-			AtomicCollection<TransactionState> transactionStates = connectionState.TransactionStates;
-			foreach(TransactionState transactionState in transactionStates)
-			{
-				foreach(Command command in transactionState.Commands)
-				{
-					transport.Oneway(command);
-				}
-			}
-		}
-
-		/// <summary>
-		/// </summary>
-		/// <param name="transport"></param>
-		/// <param name="connectionState"></param>
-		protected void DoRestoreSessions(ITransport transport, ConnectionState connectionState)
-		{
-			// Restore the connection's sessions
-			foreach(SessionState sessionState in connectionState.SessionStates)
-			{
-				transport.Oneway(sessionState.Info);
-
-				if(RestoreProducers)
-				{
-					DoRestoreProducers(transport, sessionState);
-				}
-
-				if(RestoreConsumers)
-				{
-					DoRestoreConsumers(transport, sessionState);
-				}
-			}
-		}
-
-		/// <summary>
-		/// </summary>
-		/// <param name="transport"></param>
-		/// <param name="sessionState"></param>
-		protected void DoRestoreConsumers(ITransport transport, SessionState sessionState)
-		{
+    /// <summary>
+    /// Tracks the state of a connection so a newly established transport can be
+    /// re-initialized to the state that was tracked.
+    /// </summary>
+    public class ConnectionStateTracker : CommandVisitorAdapter
+    {
+
+        private static Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
+
+        protected Dictionary<ConnectionId, ConnectionState> connectionStates = new Dictionary<ConnectionId, ConnectionState>();
+
+        private bool _trackTransactions;
+        private bool _restoreSessions = true;
+        private bool _restoreConsumers = true;
+        private bool _restoreProducers = true;
+        private bool _restoreTransaction = true;
+        private bool _trackMessages = true;
+        private int _maxCacheSize = 256;
+        private int currentCacheSize;
+        private Dictionary<MessageId, Message> messageCache = new Dictionary<MessageId, Message>();
+        private Queue<MessageId> messageCacheFIFO = new Queue<MessageId>();
+
+        protected void RemoveEldestInCache()
+        {
+            System.Collections.ICollection ic = messageCacheFIFO;
+            lock(ic.SyncRoot)
+            {
+                while(messageCacheFIFO.Count > MaxCacheSize)
+                {
+                    messageCache.Remove(messageCacheFIFO.Dequeue());
+                    currentCacheSize = currentCacheSize - 1;
+                }
+            }
+        }
+
+        private class RemoveTransactionAction : ThreadSimulator
+        {
+            private TransactionInfo info;
+            private ConnectionStateTracker cst;
+
+            public RemoveTransactionAction(TransactionInfo info, ConnectionStateTracker aCst)
+            {
+                this.info = info;
+                this.cst = aCst;
+            }
+
+            public override void Run()
+            {
+                ConnectionId connectionId = info.ConnectionId;
+                ConnectionState cs = cst.connectionStates[connectionId];
+                cs.removeTransactionState(info.TransactionId);
+            }
+        }
+
+        /// <summary>
+        /// </summary>
+        /// <param name="command"></param>
+        /// <returns>null if the command is not state tracked.</returns>
+        public Tracked track(Command command)
+        {
+            try
+            {
+                return (Tracked) command.visit(this);
+            }
+            catch(IOException e)
+            {
+                throw e;
+            }
+            catch(Exception e)
+            {
+                throw new IOException(e.Message);
+            }
+        }
+
+        public void trackBack(Command command)
+        {
+            if(TrackMessages && command != null && command.IsMessage)
+            {
+                Message message = (Message) command;
+                if(message.TransactionId == null)
+                {
+                    currentCacheSize = currentCacheSize + 1;
+                }
+            }
+        }
+
+        public void DoRestore(ITransport transport)
+        {
+            // Restore the connections.
+            foreach(ConnectionState connectionState in connectionStates.Values)
+            {
+                ConnectionInfo info = connectionState.Info;
+                info.FailoverReconnect = true;
+                transport.Oneway(info);
+
+                DoRestoreTempDestinations(transport, connectionState);
+
+                if(RestoreSessions)
+                {
+                    DoRestoreSessions(transport, connectionState);
+                }
+
+                if(RestoreTransaction)
+                {
+                    DoRestoreTransactions(transport, connectionState);
+                }
+            }
+            //now flush messages
+            foreach(Message msg in messageCache.Values)
+            {
+                transport.Oneway(msg);
+            }
+        }
+
+        private void DoRestoreTransactions(ITransport transport, ConnectionState connectionState)
+        {
+            AtomicCollection<TransactionState> transactionStates = connectionState.TransactionStates;
+            foreach(TransactionState transactionState in transactionStates)
+            {
+                foreach(Command command in transactionState.Commands)
+                {
+                    transport.Oneway(command);
+                }
+            }
+        }
+
+        /// <summary>
+        /// </summary>
+        /// <param name="transport"></param>
+        /// <param name="connectionState"></param>
+        protected void DoRestoreSessions(ITransport transport, ConnectionState connectionState)
+        {
+            // Restore the connection's sessions
+            foreach(SessionState sessionState in connectionState.SessionStates)
+            {
+                transport.Oneway(sessionState.Info);
+
+                if(RestoreProducers)
+                {
+                    DoRestoreProducers(transport, sessionState);
+                }
+
+                if(RestoreConsumers)
+                {
+                    DoRestoreConsumers(transport, sessionState);
+                }
+            }
+        }
+
+        /// <summary>
+        /// </summary>
+        /// <param name="transport"></param>
+        /// <param name="sessionState"></param>
+        protected void DoRestoreConsumers(ITransport transport, SessionState sessionState)
+        {
             // Restore the session's consumers but possibly in pull only (prefetch 0 state) till
             // recovery completes.
 
-	        ConnectionState connectionState = connectionStates[sessionState.Info.SessionId.ParentId];
-			bool connectionInterruptionProcessingComplete =
+            ConnectionState connectionState = connectionStates[sessionState.Info.SessionId.ParentId];
+            bool connectionInterruptionProcessingComplete =
                 connectionState.ConnectionInterruptProcessingComplete;
 
-			// Restore the session's consumers
-			foreach(ConsumerState consumerState in sessionState.ConsumerStates)
-			{
+            // Restore the session's consumers
+            foreach(ConsumerState consumerState in sessionState.ConsumerStates)
+            {
                 ConsumerInfo infoToSend = consumerState.Info;
 
                 if(!connectionInterruptionProcessingComplete && infoToSend.PrefetchSize > 0)
@@ -207,521 +210,521 @@ namespace Apache.NMS.ActiveMQ.State
                 }
 
                 transport.Oneway(infoToSend);
-			}
-		}
+            }
+        }
+
+        /// <summary>
+        /// </summary>
+        /// <param name="transport"></param>
+        /// <param name="sessionState"></param>
+        protected void DoRestoreProducers(ITransport transport, SessionState sessionState)
+        {
+            // Restore the session's producers
+            foreach(ProducerState producerState in sessionState.ProducerStates)
+            {
+                transport.Oneway(producerState.Info);
+            }
+        }
+
+        /// <summary>
+        /// </summary>
+        /// <param name="transport"></param>
+        /// <param name="connectionState"></param>
+        protected void DoRestoreTempDestinations(ITransport transport, ConnectionState connectionState)
+        {
+            // Restore the connection's temp destinations.
+            foreach(DestinationInfo destinationInfo in connectionState.TempDestinations)
+            {
+                transport.Oneway(destinationInfo);
+            }
+        }
+
+        public override Response processAddDestination(DestinationInfo info)
+        {
+            if(info != null)
+            {
+                ConnectionState cs = connectionStates[info.ConnectionId];
+                if(cs != null && info.Destination.IsTemporary)
+                {
+                    cs.addTempDestination(info);
+                }
+            }
+            return TRACKED_RESPONSE_MARKER;
+        }
+
+        public override Response processRemoveDestination(DestinationInfo info)
+        {
+            if(info != null)
+            {
+                ConnectionState cs = connectionStates[info.ConnectionId];
+                if(cs != null && info.Destination.IsTemporary)
+                {
+                    cs.removeTempDestination(info.Destination);
+                }
+            }
+            return TRACKED_RESPONSE_MARKER;
+        }
+
+        public override Response processAddProducer(ProducerInfo info)
+        {
+            if(info != null && info.ProducerId != null)
+            {
+                SessionId sessionId = info.ProducerId.ParentId;
+                if(sessionId != null)
+                {
+                    ConnectionId connectionId = sessionId.ParentId;
+                    if(connectionId != null)
+                    {
+                        ConnectionState cs = connectionStates[connectionId];
+                        if(cs != null)
+                        {
+                            SessionState ss = cs[sessionId];
+                            if(ss != null)
+                            {
+                                ss.addProducer(info);
+                            }
+                        }
+                    }
+                }
+            }
+            return TRACKED_RESPONSE_MARKER;
+        }
+
+        public override Response processRemoveProducer(ProducerId id)
+        {
+            if(id != null)
+            {
+                SessionId sessionId = id.ParentId;
+                if(sessionId != null)
+                {
+                    ConnectionId connectionId = sessionId.ParentId;
+                    if(connectionId != null)
+                    {
+                        ConnectionState cs = connectionStates[connectionId];
+                        if(cs != null)
+                        {
+                            SessionState ss = cs[sessionId];
+                            if(ss != null)
+                            {
+                                ss.removeProducer(id);
+                            }
+                        }
+                    }
+                }
+            }
+            return TRACKED_RESPONSE_MARKER;
+        }
+
+        public override Response processAddConsumer(ConsumerInfo info)
+        {
+            if(info != null)
+            {
+                SessionId sessionId = info.ConsumerId.ParentId;
+                if(sessionId != null)
+                {
+                    ConnectionId connectionId = sessionId.ParentId;
+                    if(connectionId != null)
+                    {
+                        ConnectionState cs = connectionStates[connectionId];
+                        if(cs != null)
+                        {
+                            SessionState ss = cs[sessionId];
+                            if(ss != null)
+                            {
+                                ss.addConsumer(info);
+                            }
+                        }
+                    }
+                }
+            }
+            return TRACKED_RESPONSE_MARKER;
+        }
+
+        public override Response processRemoveConsumer(ConsumerId id)
+        {
+            if(id != null)
+            {
+                SessionId sessionId = id.ParentId;
+                if(sessionId != null)
+                {
+                    ConnectionId connectionId = sessionId.ParentId;
+                    if(connectionId != null)
+                    {
+                        ConnectionState cs = connectionStates[connectionId];
+                        if(cs != null)
+                        {
+                            SessionState ss = cs[sessionId];
+                            if(ss != null)
+                            {
+                                ss.removeConsumer(id);
+                            }
+                        }
+                    }
+                }
+            }
+            return TRACKED_RESPONSE_MARKER;
+        }
+
+        public override Response processAddSession(SessionInfo info)
+        {
+            if(info != null)
+            {
+                ConnectionId connectionId = info.SessionId.ParentId;
+                if(connectionId != null)
+                {
+                    ConnectionState cs = connectionStates[connectionId];
+                    if(cs != null)
+                    {
+                        cs.addSession(info);
+                    }
+                }
+            }
+            return TRACKED_RESPONSE_MARKER;
+        }
+
+        public override Response processRemoveSession(SessionId id)
+        {
+            if(id != null)
+            {
+                ConnectionId connectionId = id.ParentId;
+                if(connectionId != null)
+                {
+                    ConnectionState cs = connectionStates[connectionId];
+                    if(cs != null)
+                    {
+                        cs.removeSession(id);
+                    }
+                }
+            }
+            return TRACKED_RESPONSE_MARKER;
+        }
+
+        public override Response processAddConnection(ConnectionInfo info)
+        {
+            if(info != null)
+            {
+                connectionStates.Add(info.ConnectionId, new ConnectionState(info));
+            }
+            return TRACKED_RESPONSE_MARKER;
+        }
+
+        public override Response processRemoveConnection(ConnectionId id)
+        {
+            if(id != null)
+            {
+                connectionStates.Remove(id);
+            }
+            return TRACKED_RESPONSE_MARKER;
+        }
+
+        public override Response processMessage(Message send)
+        {
+            if(send != null)
+            {
+                if(TrackTransactions && send.TransactionId != null)
+                {
+                    ConnectionId connectionId = send.ProducerId.ParentId.ParentId;
+                    if(connectionId != null)
+                    {
+                        ConnectionState cs = connectionStates[connectionId];
+                        if(cs != null)
+                        {
+                            TransactionState transactionState = cs[send.TransactionId];
+                            if(transactionState != null)
+                            {
+                                transactionState.addCommand(send);
+                            }
+                        }
+                    }
+                    return TRACKED_RESPONSE_MARKER;
+                }
+                else if(TrackMessages)
+                {
+                    messageCache.Add(send.MessageId, (Message) send.Clone());
+                    RemoveEldestInCache();
+                }
+            }
+            return null;
+        }
+
+        public override Response processMessageAck(MessageAck ack)
+        {
+            if(TrackTransactions && ack != null && ack.TransactionId != null)
+            {
+                ConnectionId connectionId = ack.ConsumerId.ParentId.ParentId;
+                if(connectionId != null)
+                {
+                    ConnectionState cs = connectionStates[connectionId];
+                    if(cs != null)
+                    {
+                        TransactionState transactionState = cs[ack.TransactionId];
+                        if(transactionState != null)
+                        {
+                            transactionState.addCommand(ack);
+                        }
+                    }
+                }
+                return TRACKED_RESPONSE_MARKER;
+            }
+            return null;
+        }
+
+        public override Response processBeginTransaction(TransactionInfo info)
+        {
+            if(TrackTransactions && info != null && info.TransactionId != null)
+            {
+                ConnectionId connectionId = info.ConnectionId;
+                if(connectionId != null)
+                {
+                    ConnectionState cs = connectionStates[connectionId];
+                    if(cs != null)
+                    {
+                        cs.addTransactionState(info.TransactionId);
+                        TransactionState state = cs[info.TransactionId];
+                        state.addCommand(info);
+                    }
+                }
+                return TRACKED_RESPONSE_MARKER;
+            }
+            return null;
+        }
+
+        public override Response processPrepareTransaction(TransactionInfo info)
+        {
+            if(TrackTransactions && info != null)
+            {
+                ConnectionId connectionId = info.ConnectionId;
+                if(connectionId != null)
+                {
+                    ConnectionState cs = connectionStates[connectionId];
+                    if(cs != null)
+                    {
+                        TransactionState transactionState = cs[info.TransactionId];
+                        if(transactionState != null)
+                        {
+                            transactionState.addCommand(info);
+                        }
+                    }
+                }
+                return TRACKED_RESPONSE_MARKER;
+            }
+            return null;
+        }
+
+        public override Response processCommitTransactionOnePhase(TransactionInfo info)
+        {
+            if(TrackTransactions && info != null)
+            {
+                ConnectionId connectionId = info.ConnectionId;
+                if(connectionId != null)
+                {
+                    ConnectionState cs = connectionStates[connectionId];
+                    if(cs != null)
+                    {
+                        TransactionState transactionState = cs[info.TransactionId];
+                        if(transactionState != null)
+                        {
+                            transactionState.addCommand(info);
+                            return new Tracked(new RemoveTransactionAction(info, this));
+                        }
+                    }
+                }
+            }
+            return null;
+        }
+
+        public override Response processCommitTransactionTwoPhase(TransactionInfo info)
+        {
+            if(TrackTransactions && info != null)
+            {
+                ConnectionId connectionId = info.ConnectionId;
+                if(connectionId != null)
+                {
+                    ConnectionState cs = connectionStates[connectionId];
+                    if(cs != null)
+                    {
+                        TransactionState transactionState = cs[info.TransactionId];
+                        if(transactionState != null)
+                        {
+                            transactionState.addCommand(info);
+                            return new Tracked(new RemoveTransactionAction(info, this));
+                        }
+                    }
+                }
+            }
+            return null;
+        }
+
+        public override Response processRollbackTransaction(TransactionInfo info)
+        {
+            if(TrackTransactions && info != null)
+            {
+                ConnectionId connectionId = info.ConnectionId;
+                if(connectionId != null)
+                {
+                    ConnectionState cs = connectionStates[connectionId];
+                    if(cs != null)
+                    {
+                        TransactionState transactionState = cs[info.TransactionId];
+                        if(transactionState != null)
+                        {
+                            transactionState.addCommand(info);
+                            return new Tracked(new RemoveTransactionAction(info, this));
+                        }
+                    }
+                }
+            }
+            return null;
+        }
+
+        public override Response processEndTransaction(TransactionInfo info)
+        {
+            if(TrackTransactions && info != null)
+            {
+                ConnectionId connectionId = info.ConnectionId;
+                if(connectionId != null)
+                {
+                    ConnectionState cs = connectionStates[connectionId];
+                    if(cs != null)
+                    {
+                        TransactionState transactionState = cs[info.TransactionId];
+                        if(transactionState != null)
+                        {
+                            transactionState.addCommand(info);
+                        }
+                    }
+                }
+                return TRACKED_RESPONSE_MARKER;
+            }
+            return null;
+        }
+
+        public bool RestoreConsumers
+        {
+            get
+            {
+                return _restoreConsumers;
+            }
+            set
+            {
+                _restoreConsumers = value;
+            }
+        }
+
+        public bool RestoreProducers
+        {
+            get
+            {
+                return _restoreProducers;
+            }
+            set
+            {
+                _restoreProducers = value;
+            }
+        }
+
+        public bool RestoreSessions
+        {
+            get
+            {
+                return _restoreSessions;
+            }
+            set
+            {
+                _restoreSessions = value;
+            }
+        }
+
+        public bool TrackTransactions
+        {
+            get
+            {
+                return _trackTransactions;
+            }
+            set
+            {
+                _trackTransactions = value;
+            }
+        }
+
+        public bool RestoreTransaction
+        {
+            get
+            {
+                return _restoreTransaction;
+            }
+            set
+            {
+                _restoreTransaction = value;
+            }
+        }
+
+        public bool TrackMessages
+        {
+            get
+            {
+                return _trackMessages;
+            }
+            set
+            {
+                _trackMessages = value;
+            }
+        }
+
+        public int MaxCacheSize
+        {
+            get
+            {
+                return _maxCacheSize;
+            }
+            set
+            {
+                _maxCacheSize = value;
+            }
+        }
+
+        public void ConnectionInterruptProcessingComplete(ITransport transport, ConnectionId connectionId)
+        {
+            ConnectionState connectionState = connectionStates[connectionId];
+            if(connectionState != null)
+            {
+                connectionState.ConnectionInterruptProcessingComplete = true;
 
-		/// <summary>
-		/// </summary>
-		/// <param name="transport"></param>
-		/// <param name="sessionState"></param>
-		protected void DoRestoreProducers(ITransport transport, SessionState sessionState)
-		{
-			// Restore the session's producers
-			foreach(ProducerState producerState in sessionState.ProducerStates)
-			{
-				transport.Oneway(producerState.Info);
-			}
-		}
-
-		/// <summary>
-		/// </summary>
-		/// <param name="transport"></param>
-		/// <param name="connectionState"></param>
-		protected void DoRestoreTempDestinations(ITransport transport, ConnectionState connectionState)
-		{
-			// Restore the connection's temp destinations.
-			foreach(DestinationInfo destinationInfo in connectionState.TempDestinations)
-			{
-				transport.Oneway(destinationInfo);
-			}
-		}
-
-		public override Response processAddDestination(DestinationInfo info)
-		{
-			if(info != null)
-			{
-				ConnectionState cs = connectionStates[info.ConnectionId];
-				if(cs != null && info.Destination.IsTemporary)
-				{
-					cs.addTempDestination(info);
-				}
-			}
-			return TRACKED_RESPONSE_MARKER;
-		}
-
-		public override Response processRemoveDestination(DestinationInfo info)
-		{
-			if(info != null)
-			{
-				ConnectionState cs = connectionStates[info.ConnectionId];
-				if(cs != null && info.Destination.IsTemporary)
-				{
-					cs.removeTempDestination(info.Destination);
-				}
-			}
-			return TRACKED_RESPONSE_MARKER;
-		}
-
-		public override Response processAddProducer(ProducerInfo info)
-		{
-			if(info != null && info.ProducerId != null)
-			{
-				SessionId sessionId = info.ProducerId.ParentId;
-				if(sessionId != null)
-				{
-					ConnectionId connectionId = sessionId.ParentId;
-					if(connectionId != null)
-					{
-						ConnectionState cs = connectionStates[connectionId];
-						if(cs != null)
-						{
-							SessionState ss = cs[sessionId];
-							if(ss != null)
-							{
-								ss.addProducer(info);
-							}
-						}
-					}
-				}
-			}
-			return TRACKED_RESPONSE_MARKER;
-		}
-
-		public override Response processRemoveProducer(ProducerId id)
-		{
-			if(id != null)
-			{
-				SessionId sessionId = id.ParentId;
-				if(sessionId != null)
-				{
-					ConnectionId connectionId = sessionId.ParentId;
-					if(connectionId != null)
-					{
-						ConnectionState cs = connectionStates[connectionId];
-						if(cs != null)
-						{
-							SessionState ss = cs[sessionId];
-							if(ss != null)
-							{
-								ss.removeProducer(id);
-							}
-						}
-					}
-				}
-			}
-			return TRACKED_RESPONSE_MARKER;
-		}
-
-		public override Response processAddConsumer(ConsumerInfo info)
-		{
-			if(info != null)
-			{
-				SessionId sessionId = info.ConsumerId.ParentId;
-				if(sessionId != null)
-				{
-					ConnectionId connectionId = sessionId.ParentId;
-					if(connectionId != null)
-					{
-						ConnectionState cs = connectionStates[connectionId];
-						if(cs != null)
-						{
-							SessionState ss = cs[sessionId];
-							if(ss != null)
-							{
-								ss.addConsumer(info);
-							}
-						}
-					}
-				}
-			}
-			return TRACKED_RESPONSE_MARKER;
-		}
-
-		public override Response processRemoveConsumer(ConsumerId id)
-		{
-			if(id != null)
-			{
-				SessionId sessionId = id.ParentId;
-				if(sessionId != null)
-				{
-					ConnectionId connectionId = sessionId.ParentId;
-					if(connectionId != null)
-					{
-						ConnectionState cs = connectionStates[connectionId];
-						if(cs != null)
-						{
-							SessionState ss = cs[sessionId];
-							if(ss != null)
-							{
-								ss.removeConsumer(id);
-							}
-						}
-					}
-				}
-			}
-			return TRACKED_RESPONSE_MARKER;
-		}
-
-		public override Response processAddSession(SessionInfo info)
-		{
-			if(info != null)
-			{
-				ConnectionId connectionId = info.SessionId.ParentId;
-				if(connectionId != null)
-				{
-					ConnectionState cs = connectionStates[connectionId];
-					if(cs != null)
-					{
-						cs.addSession(info);
-					}
-				}
-			}
-			return TRACKED_RESPONSE_MARKER;
-		}
-
-		public override Response processRemoveSession(SessionId id)
-		{
-			if(id != null)
-			{
-				ConnectionId connectionId = id.ParentId;
-				if(connectionId != null)
-				{
-					ConnectionState cs = connectionStates[connectionId];
-					if(cs != null)
-					{
-						cs.removeSession(id);
-					}
-				}
-			}
-			return TRACKED_RESPONSE_MARKER;
-		}
-
-		public override Response processAddConnection(ConnectionInfo info)
-		{
-			if(info != null)
-			{
-				connectionStates.Add(info.ConnectionId, new ConnectionState(info));
-			}
-			return TRACKED_RESPONSE_MARKER;
-		}
-
-		public override Response processRemoveConnection(ConnectionId id)
-		{
-			if(id != null)
-			{
-				connectionStates.Remove(id);
-			}
-			return TRACKED_RESPONSE_MARKER;
-		}
-
-		public override Response processMessage(Message send)
-		{
-			if(send != null)
-			{
-				if(TrackTransactions && send.TransactionId != null)
-				{
-					ConnectionId connectionId = send.ProducerId.ParentId.ParentId;
-					if(connectionId != null)
-					{
-						ConnectionState cs = connectionStates[connectionId];
-						if(cs != null)
-						{
-							TransactionState transactionState = cs[send.TransactionId];
-							if(transactionState != null)
-							{
-								transactionState.addCommand(send);
-							}
-						}
-					}
-					return TRACKED_RESPONSE_MARKER;
-				}
-				else if(TrackMessages)
-				{
-					messageCache.Add(send.MessageId, (Message) send.Clone());
-					RemoveEldestInCache();
-				}
-			}
-			return null;
-		}
-
-		public override Response processMessageAck(MessageAck ack)
-		{
-			if(TrackTransactions && ack != null && ack.TransactionId != null)
-			{
-				ConnectionId connectionId = ack.ConsumerId.ParentId.ParentId;
-				if(connectionId != null)
-				{
-					ConnectionState cs = connectionStates[connectionId];
-					if(cs != null)
-					{
-						TransactionState transactionState = cs[ack.TransactionId];
-						if(transactionState != null)
-						{
-							transactionState.addCommand(ack);
-						}
-					}
-				}
-				return TRACKED_RESPONSE_MARKER;
-			}
-			return null;
-		}
-
-		public override Response processBeginTransaction(TransactionInfo info)
-		{
-			if(TrackTransactions && info != null && info.TransactionId != null)
-			{
-				ConnectionId connectionId = info.ConnectionId;
-				if(connectionId != null)
-				{
-					ConnectionState cs = connectionStates[connectionId];
-					if(cs != null)
-					{
-						cs.addTransactionState(info.TransactionId);
-						TransactionState state = cs[info.TransactionId];
-						state.addCommand(info);
-					}
-				}
-				return TRACKED_RESPONSE_MARKER;
-			}
-			return null;
-		}
-
-		public override Response processPrepareTransaction(TransactionInfo info)
-		{
-			if(TrackTransactions && info != null)
-			{
-				ConnectionId connectionId = info.ConnectionId;
-				if(connectionId != null)
-				{
-					ConnectionState cs = connectionStates[connectionId];
-					if(cs != null)
-					{
-						TransactionState transactionState = cs[info.TransactionId];
-						if(transactionState != null)
-						{
-							transactionState.addCommand(info);
-						}
-					}
-				}
-				return TRACKED_RESPONSE_MARKER;
-			}
-			return null;
-		}
-
-		public override Response processCommitTransactionOnePhase(TransactionInfo info)
-		{
-			if(TrackTransactions && info != null)
-			{
-				ConnectionId connectionId = info.ConnectionId;
-				if(connectionId != null)
-				{
-					ConnectionState cs = connectionStates[connectionId];
-					if(cs != null)
-					{
-						TransactionState transactionState = cs[info.TransactionId];
-						if(transactionState != null)
-						{
-							transactionState.addCommand(info);
-							return new Tracked(new RemoveTransactionAction(info, this));
-						}
-					}
-				}
-			}
-			return null;
-		}
-
-		public override Response processCommitTransactionTwoPhase(TransactionInfo info)
-		{
-			if(TrackTransactions && info != null)
-			{
-				ConnectionId connectionId = info.ConnectionId;
-				if(connectionId != null)
-				{
-					ConnectionState cs = connectionStates[connectionId];
-					if(cs != null)
-					{
-						TransactionState transactionState = cs[info.TransactionId];
-						if(transactionState != null)
-						{
-							transactionState.addCommand(info);
-							return new Tracked(new RemoveTransactionAction(info, this));
-						}
-					}
-				}
-			}
-			return null;
-		}
-
-		public override Response processRollbackTransaction(TransactionInfo info)
-		{
-			if(TrackTransactions && info != null)
-			{
-				ConnectionId connectionId = info.ConnectionId;
-				if(connectionId != null)
-				{
-					ConnectionState cs = connectionStates[connectionId];
-					if(cs != null)
-					{
-						TransactionState transactionState = cs[info.TransactionId];
-						if(transactionState != null)
-						{
-							transactionState.addCommand(info);
-							return new Tracked(new RemoveTransactionAction(info, this));
-						}
-					}
-				}
-			}
-			return null;
-		}
-
-		public override Response processEndTransaction(TransactionInfo info)
-		{
-			if(TrackTransactions && info != null)
-			{
-				ConnectionId connectionId = info.ConnectionId;
-				if(connectionId != null)
-				{
-					ConnectionState cs = connectionStates[connectionId];
-					if(cs != null)
-					{
-						TransactionState transactionState = cs[info.TransactionId];
-						if(transactionState != null)
-						{
-							transactionState.addCommand(info);
-						}
-					}
-				}
-				return TRACKED_RESPONSE_MARKER;
-			}
-			return null;
-		}
-
-		public bool RestoreConsumers
-		{
-			get
-			{
-				return _restoreConsumers;
-			}
-			set
-			{
-				_restoreConsumers = value;
-			}
-		}
-
-		public bool RestoreProducers
-		{
-			get
-			{
-				return _restoreProducers;
-			}
-			set
-			{
-				_restoreProducers = value;
-			}
-		}
-
-		public bool RestoreSessions
-		{
-			get
-			{
-				return _restoreSessions;
-			}
-			set
-			{
-				_restoreSessions = value;
-			}
-		}
-
-		public bool TrackTransactions
-		{
-			get
-			{
-				return _trackTransactions;
-			}
-			set
-			{
-				_trackTransactions = value;
-			}
-		}
-
-		public bool RestoreTransaction
-		{
-			get
-			{
-				return _restoreTransaction;
-			}
-			set
-			{
-				_restoreTransaction = value;
-			}
-		}
-
-		public bool TrackMessages
-		{
-			get
-			{
-				return _trackMessages;
-			}
-			set
-			{
-				_trackMessages = value;
-			}
-		}
-
-		public int MaxCacheSize
-		{
-			get
-			{
-				return _maxCacheSize;
-			}
-			set
-			{
-				_maxCacheSize = value;
-			}
-		}
-
-	    public void ConnectionInterruptProcessingComplete(ITransport transport, ConnectionId connectionId) 
-		{
-	        ConnectionState connectionState = connectionStates[connectionId];
-	        if(connectionState != null) 
-			{
-	            connectionState.ConnectionInterruptProcessingComplete = true;
-	            
-				Dictionary<ConsumerId, ConsumerInfo> stalledConsumers = connectionState.RecoveringPullConsumers;
-				foreach(KeyValuePair<ConsumerId, ConsumerInfo> entry in stalledConsumers)
-				{
-	                ConsumerControl control = new ConsumerControl();
-	                control.ConsumerId = entry.Key;
-	                control.Prefetch = entry.Value.PrefetchSize;
-	                control.Destination = entry.Value.Destination;
-	                try 
-					{
-	                    if(Tracer.IsDebugEnabled) 
-						{
-	                        Tracer.Debug("restored recovering consumer: " + control.ConsumerId + 
-							             " with: " + control.Prefetch);
-	                    }
-	                    transport.Oneway(control);
-	                } 
-					catch(Exception ex)
-					{
-	                    if(Tracer.IsDebugEnabled) 
-						{
-	                        Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId +
-	                                     " with: " + control.Prefetch + "Error: " + ex.Message);
-	                    }
-	                }
-	            }
-	            stalledConsumers.Clear();
-	        }
-	    }
-		
-	    public void TransportInterrupted() 
-		{
-	        foreach(ConnectionState connectionState in connectionStates.Values) 
-			{
-	            connectionState.ConnectionInterruptProcessingComplete = false;
-	        }
-	    }		
-	}
+                Dictionary<ConsumerId, ConsumerInfo> stalledConsumers = connectionState.RecoveringPullConsumers;
+                foreach(KeyValuePair<ConsumerId, ConsumerInfo> entry in stalledConsumers)
+                {
+                    ConsumerControl control = new ConsumerControl();
+                    control.ConsumerId = entry.Key;
+                    control.Prefetch = entry.Value.PrefetchSize;
+                    control.Destination = entry.Value.Destination;
+                    try
+                    {
+                        if(Tracer.IsDebugEnabled)
+                        {
+                            Tracer.Debug("restored recovering consumer: " + control.ConsumerId +
+                                         " with: " + control.Prefetch);
+                        }
+                        transport.Oneway(control);
+                    }
+                    catch(Exception ex)
+                    {
+                        if(Tracer.IsDebugEnabled)
+                        {
+                            Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId +
+                                         " with: " + control.Prefetch + "Error: " + ex.Message);
+                        }
+                    }
+                }
+                stalledConsumers.Clear();
+            }
+        }
+
+        public void TransportInterrupted()
+        {
+            foreach(ConnectionState connectionState in connectionStates.Values)
+            {
+                connectionState.ConnectionInterruptProcessingComplete = false;
+            }
+        }
+    }
 }