You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/07/09 01:09:23 UTC
svn commit: r961976 - in
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp:
Commands/ConnectionInfo.cs Connection.cs
OpenWire/V6/ConnectionInfoMarshaller.cs State/ConnectionStateTracker.cs
Author: tabish
Date: Thu Jul 8 23:09:22 2010
New Revision: 961976
URL: http://svn.apache.org/viewvc?rev=961976&view=rev
Log:
Update Commands and set Properties correctly.
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ConnectionInfo.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V6/ConnectionInfoMarshaller.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ConnectionInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ConnectionInfo.cs?rev=961976&r1=961975&r2=961976&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ConnectionInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ConnectionInfo.cs Thu Jul 8 23:09:22 2010
@@ -44,6 +44,7 @@ namespace Apache.NMS.ActiveMQ.Commands
bool manageable;
bool clientMaster;
bool faultTolerant;
+ bool failoverReconnect;
///
/// <summery>
@@ -75,7 +76,8 @@ namespace Apache.NMS.ActiveMQ.Commands
"BrokerMasterConnector = " + BrokerMasterConnector + ", " +
"Manageable = " + Manageable + ", " +
"ClientMaster = " + ClientMaster + ", " +
- "FaultTolerant = " + FaultTolerant + " ]";
+ "FaultTolerant = " + FaultTolerant + ", " +
+ "FailoverReconnect = " + FailoverReconnect + " ]";
}
public ConnectionId ConnectionId
@@ -132,6 +134,12 @@ namespace Apache.NMS.ActiveMQ.Commands
set { this.faultTolerant = value; }
}
+ public bool FailoverReconnect
+ {
+ get { return failoverReconnect; }
+ set { this.failoverReconnect = value; }
+ }
+
///
/// <summery>
/// Return an answer of true to the isConnectionInfo() query.
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=961976&r1=961975&r2=961976&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Thu Jul 8 23:09:22 2010
@@ -87,6 +87,7 @@ namespace Apache.NMS.ActiveMQ
this.info = new ConnectionInfo();
this.info.ConnectionId = id;
+ this.info.FaultTolerant = transport.IsFaultTolerant;
}
~Connection()
@@ -930,7 +931,7 @@ namespace Apache.NMS.ActiveMQ
}
}
}
-
+
private void SignalInterruptionProcessingComplete()
{
CountDownLatch cdl = this.transportInterruptionProcessingComplete;
@@ -952,7 +953,7 @@ namespace Apache.NMS.ActiveMQ
") of interruption completion for: " + this.info.ConnectionId);
}
}
-
+
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V6/ConnectionInfoMarshaller.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V6/ConnectionInfoMarshaller.cs?rev=961976&r1=961975&r2=961976&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V6/ConnectionInfoMarshaller.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V6/ConnectionInfoMarshaller.cs Thu Jul 8 23:09:22 2010
@@ -84,6 +84,7 @@ namespace Apache.NMS.ActiveMQ.OpenWire.V
info.Manageable = bs.ReadBoolean();
info.ClientMaster = bs.ReadBoolean();
info.FaultTolerant = bs.ReadBoolean();
+ info.FailoverReconnect = bs.ReadBoolean();
}
//
@@ -103,6 +104,7 @@ namespace Apache.NMS.ActiveMQ.OpenWire.V
bs.WriteBoolean(info.Manageable);
bs.WriteBoolean(info.ClientMaster);
bs.WriteBoolean(info.FaultTolerant);
+ bs.WriteBoolean(info.FailoverReconnect);
return rc + 0;
}
@@ -124,6 +126,7 @@ namespace Apache.NMS.ActiveMQ.OpenWire.V
bs.ReadBoolean();
bs.ReadBoolean();
bs.ReadBoolean();
+ bs.ReadBoolean();
}
//
@@ -154,6 +157,7 @@ namespace Apache.NMS.ActiveMQ.OpenWire.V
info.Manageable = dataIn.ReadBoolean();
info.ClientMaster = dataIn.ReadBoolean();
info.FaultTolerant = dataIn.ReadBoolean();
+ info.FailoverReconnect = dataIn.ReadBoolean();
}
//
@@ -174,6 +178,7 @@ namespace Apache.NMS.ActiveMQ.OpenWire.V
dataOut.Write(info.Manageable);
dataOut.Write(info.ClientMaster);
dataOut.Write(info.FaultTolerant);
+ dataOut.Write(info.FailoverReconnect);
}
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs?rev=961976&r1=961975&r2=961976&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs Thu Jul 8 23:09:22 2010
@@ -24,168 +24,171 @@ using Apache.NMS.ActiveMQ.Transport;
namespace Apache.NMS.ActiveMQ.State
{
- /// <summary>
- /// Tracks the state of a connection so a newly established transport can be
- /// re-initialized to the state that was tracked.
- /// </summary>
- public class ConnectionStateTracker : CommandVisitorAdapter
- {
-
- private static Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
-
- protected Dictionary<ConnectionId, ConnectionState> connectionStates = new Dictionary<ConnectionId, ConnectionState>();
-
- private bool _trackTransactions;
- private bool _restoreSessions = true;
- private bool _restoreConsumers = true;
- private bool _restoreProducers = true;
- private bool _restoreTransaction = true;
- private bool _trackMessages = true;
- private int _maxCacheSize = 256;
- private int currentCacheSize;
- private Dictionary<MessageId, Message> messageCache = new Dictionary<MessageId, Message>();
- private Queue<MessageId> messageCacheFIFO = new Queue<MessageId>();
-
- protected void RemoveEldestInCache()
- {
- System.Collections.ICollection ic = messageCacheFIFO;
- lock(ic.SyncRoot)
- {
- while(messageCacheFIFO.Count > MaxCacheSize)
- {
- messageCache.Remove(messageCacheFIFO.Dequeue());
- currentCacheSize = currentCacheSize - 1;
- }
- }
- }
-
- private class RemoveTransactionAction : ThreadSimulator
- {
- private TransactionInfo info;
- private ConnectionStateTracker cst;
-
- public RemoveTransactionAction(TransactionInfo info, ConnectionStateTracker aCst)
- {
- this.info = info;
- this.cst = aCst;
- }
-
- public override void Run()
- {
- ConnectionId connectionId = info.ConnectionId;
- ConnectionState cs = cst.connectionStates[connectionId];
- cs.removeTransactionState(info.TransactionId);
- }
- }
-
- /// <summary>
- /// </summary>
- /// <param name="command"></param>
- /// <returns>null if the command is not state tracked.</returns>
- public Tracked track(Command command)
- {
- try
- {
- return (Tracked) command.visit(this);
- }
- catch(IOException e)
- {
- throw e;
- }
- catch(Exception e)
- {
- throw new IOException(e.Message);
- }
- }
-
- public void trackBack(Command command)
- {
- if(TrackMessages && command != null && command.IsMessage)
- {
- Message message = (Message) command;
- if(message.TransactionId == null)
- {
- currentCacheSize = currentCacheSize + 1;
- }
- }
- }
-
- public void DoRestore(ITransport transport)
- {
- // Restore the connections.
- foreach(ConnectionState connectionState in connectionStates.Values)
- {
- transport.Oneway(connectionState.Info);
- DoRestoreTempDestinations(transport, connectionState);
-
- if(RestoreSessions)
- {
- DoRestoreSessions(transport, connectionState);
- }
-
- if(RestoreTransaction)
- {
- DoRestoreTransactions(transport, connectionState);
- }
- }
- //now flush messages
- foreach(Message msg in messageCache.Values)
- {
- transport.Oneway(msg);
- }
- }
-
- private void DoRestoreTransactions(ITransport transport, ConnectionState connectionState)
- {
- AtomicCollection<TransactionState> transactionStates = connectionState.TransactionStates;
- foreach(TransactionState transactionState in transactionStates)
- {
- foreach(Command command in transactionState.Commands)
- {
- transport.Oneway(command);
- }
- }
- }
-
- /// <summary>
- /// </summary>
- /// <param name="transport"></param>
- /// <param name="connectionState"></param>
- protected void DoRestoreSessions(ITransport transport, ConnectionState connectionState)
- {
- // Restore the connection's sessions
- foreach(SessionState sessionState in connectionState.SessionStates)
- {
- transport.Oneway(sessionState.Info);
-
- if(RestoreProducers)
- {
- DoRestoreProducers(transport, sessionState);
- }
-
- if(RestoreConsumers)
- {
- DoRestoreConsumers(transport, sessionState);
- }
- }
- }
-
- /// <summary>
- /// </summary>
- /// <param name="transport"></param>
- /// <param name="sessionState"></param>
- protected void DoRestoreConsumers(ITransport transport, SessionState sessionState)
- {
+ /// <summary>
+ /// Tracks the state of a connection so a newly established transport can be
+ /// re-initialized to the state that was tracked.
+ /// </summary>
+ public class ConnectionStateTracker : CommandVisitorAdapter
+ {
+
+ private static Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
+
+ protected Dictionary<ConnectionId, ConnectionState> connectionStates = new Dictionary<ConnectionId, ConnectionState>();
+
+ private bool _trackTransactions;
+ private bool _restoreSessions = true;
+ private bool _restoreConsumers = true;
+ private bool _restoreProducers = true;
+ private bool _restoreTransaction = true;
+ private bool _trackMessages = true;
+ private int _maxCacheSize = 256;
+ private int currentCacheSize;
+ private Dictionary<MessageId, Message> messageCache = new Dictionary<MessageId, Message>();
+ private Queue<MessageId> messageCacheFIFO = new Queue<MessageId>();
+
+ protected void RemoveEldestInCache()
+ {
+ System.Collections.ICollection ic = messageCacheFIFO;
+ lock(ic.SyncRoot)
+ {
+ while(messageCacheFIFO.Count > MaxCacheSize)
+ {
+ messageCache.Remove(messageCacheFIFO.Dequeue());
+ currentCacheSize = currentCacheSize - 1;
+ }
+ }
+ }
+
+ private class RemoveTransactionAction : ThreadSimulator
+ {
+ private TransactionInfo info;
+ private ConnectionStateTracker cst;
+
+ public RemoveTransactionAction(TransactionInfo info, ConnectionStateTracker aCst)
+ {
+ this.info = info;
+ this.cst = aCst;
+ }
+
+ public override void Run()
+ {
+ ConnectionId connectionId = info.ConnectionId;
+ ConnectionState cs = cst.connectionStates[connectionId];
+ cs.removeTransactionState(info.TransactionId);
+ }
+ }
+
+ /// <summary>
+ /// </summary>
+ /// <param name="command"></param>
+ /// <returns>null if the command is not state tracked.</returns>
+ public Tracked track(Command command)
+ {
+ try
+ {
+ return (Tracked) command.visit(this);
+ }
+ catch(IOException e)
+ {
+ throw e;
+ }
+ catch(Exception e)
+ {
+ throw new IOException(e.Message);
+ }
+ }
+
+ public void trackBack(Command command)
+ {
+ if(TrackMessages && command != null && command.IsMessage)
+ {
+ Message message = (Message) command;
+ if(message.TransactionId == null)
+ {
+ currentCacheSize = currentCacheSize + 1;
+ }
+ }
+ }
+
+ public void DoRestore(ITransport transport)
+ {
+ // Restore the connections.
+ foreach(ConnectionState connectionState in connectionStates.Values)
+ {
+ ConnectionInfo info = connectionState.Info;
+ info.FailoverReconnect = true;
+ transport.Oneway(info);
+
+ DoRestoreTempDestinations(transport, connectionState);
+
+ if(RestoreSessions)
+ {
+ DoRestoreSessions(transport, connectionState);
+ }
+
+ if(RestoreTransaction)
+ {
+ DoRestoreTransactions(transport, connectionState);
+ }
+ }
+ //now flush messages
+ foreach(Message msg in messageCache.Values)
+ {
+ transport.Oneway(msg);
+ }
+ }
+
+ private void DoRestoreTransactions(ITransport transport, ConnectionState connectionState)
+ {
+ AtomicCollection<TransactionState> transactionStates = connectionState.TransactionStates;
+ foreach(TransactionState transactionState in transactionStates)
+ {
+ foreach(Command command in transactionState.Commands)
+ {
+ transport.Oneway(command);
+ }
+ }
+ }
+
+ /// <summary>
+ /// </summary>
+ /// <param name="transport"></param>
+ /// <param name="connectionState"></param>
+ protected void DoRestoreSessions(ITransport transport, ConnectionState connectionState)
+ {
+ // Restore the connection's sessions
+ foreach(SessionState sessionState in connectionState.SessionStates)
+ {
+ transport.Oneway(sessionState.Info);
+
+ if(RestoreProducers)
+ {
+ DoRestoreProducers(transport, sessionState);
+ }
+
+ if(RestoreConsumers)
+ {
+ DoRestoreConsumers(transport, sessionState);
+ }
+ }
+ }
+
+ /// <summary>
+ /// </summary>
+ /// <param name="transport"></param>
+ /// <param name="sessionState"></param>
+ protected void DoRestoreConsumers(ITransport transport, SessionState sessionState)
+ {
// Restore the session's consumers but possibly in pull only (prefetch 0 state) till
// recovery completes.
- ConnectionState connectionState = connectionStates[sessionState.Info.SessionId.ParentId];
- bool connectionInterruptionProcessingComplete =
+ ConnectionState connectionState = connectionStates[sessionState.Info.SessionId.ParentId];
+ bool connectionInterruptionProcessingComplete =
connectionState.ConnectionInterruptProcessingComplete;
- // Restore the session's consumers
- foreach(ConsumerState consumerState in sessionState.ConsumerStates)
- {
+ // Restore the session's consumers
+ foreach(ConsumerState consumerState in sessionState.ConsumerStates)
+ {
ConsumerInfo infoToSend = consumerState.Info;
if(!connectionInterruptionProcessingComplete && infoToSend.PrefetchSize > 0)
@@ -207,521 +210,521 @@ namespace Apache.NMS.ActiveMQ.State
}
transport.Oneway(infoToSend);
- }
- }
+ }
+ }
+
+ /// <summary>
+ /// </summary>
+ /// <param name="transport"></param>
+ /// <param name="sessionState"></param>
+ protected void DoRestoreProducers(ITransport transport, SessionState sessionState)
+ {
+ // Restore the session's producers
+ foreach(ProducerState producerState in sessionState.ProducerStates)
+ {
+ transport.Oneway(producerState.Info);
+ }
+ }
+
+ /// <summary>
+ /// </summary>
+ /// <param name="transport"></param>
+ /// <param name="connectionState"></param>
+ protected void DoRestoreTempDestinations(ITransport transport, ConnectionState connectionState)
+ {
+ // Restore the connection's temp destinations.
+ foreach(DestinationInfo destinationInfo in connectionState.TempDestinations)
+ {
+ transport.Oneway(destinationInfo);
+ }
+ }
+
+ public override Response processAddDestination(DestinationInfo info)
+ {
+ if(info != null)
+ {
+ ConnectionState cs = connectionStates[info.ConnectionId];
+ if(cs != null && info.Destination.IsTemporary)
+ {
+ cs.addTempDestination(info);
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public override Response processRemoveDestination(DestinationInfo info)
+ {
+ if(info != null)
+ {
+ ConnectionState cs = connectionStates[info.ConnectionId];
+ if(cs != null && info.Destination.IsTemporary)
+ {
+ cs.removeTempDestination(info.Destination);
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public override Response processAddProducer(ProducerInfo info)
+ {
+ if(info != null && info.ProducerId != null)
+ {
+ SessionId sessionId = info.ProducerId.ParentId;
+ if(sessionId != null)
+ {
+ ConnectionId connectionId = sessionId.ParentId;
+ if(connectionId != null)
+ {
+ ConnectionState cs = connectionStates[connectionId];
+ if(cs != null)
+ {
+ SessionState ss = cs[sessionId];
+ if(ss != null)
+ {
+ ss.addProducer(info);
+ }
+ }
+ }
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public override Response processRemoveProducer(ProducerId id)
+ {
+ if(id != null)
+ {
+ SessionId sessionId = id.ParentId;
+ if(sessionId != null)
+ {
+ ConnectionId connectionId = sessionId.ParentId;
+ if(connectionId != null)
+ {
+ ConnectionState cs = connectionStates[connectionId];
+ if(cs != null)
+ {
+ SessionState ss = cs[sessionId];
+ if(ss != null)
+ {
+ ss.removeProducer(id);
+ }
+ }
+ }
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public override Response processAddConsumer(ConsumerInfo info)
+ {
+ if(info != null)
+ {
+ SessionId sessionId = info.ConsumerId.ParentId;
+ if(sessionId != null)
+ {
+ ConnectionId connectionId = sessionId.ParentId;
+ if(connectionId != null)
+ {
+ ConnectionState cs = connectionStates[connectionId];
+ if(cs != null)
+ {
+ SessionState ss = cs[sessionId];
+ if(ss != null)
+ {
+ ss.addConsumer(info);
+ }
+ }
+ }
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public override Response processRemoveConsumer(ConsumerId id)
+ {
+ if(id != null)
+ {
+ SessionId sessionId = id.ParentId;
+ if(sessionId != null)
+ {
+ ConnectionId connectionId = sessionId.ParentId;
+ if(connectionId != null)
+ {
+ ConnectionState cs = connectionStates[connectionId];
+ if(cs != null)
+ {
+ SessionState ss = cs[sessionId];
+ if(ss != null)
+ {
+ ss.removeConsumer(id);
+ }
+ }
+ }
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public override Response processAddSession(SessionInfo info)
+ {
+ if(info != null)
+ {
+ ConnectionId connectionId = info.SessionId.ParentId;
+ if(connectionId != null)
+ {
+ ConnectionState cs = connectionStates[connectionId];
+ if(cs != null)
+ {
+ cs.addSession(info);
+ }
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public override Response processRemoveSession(SessionId id)
+ {
+ if(id != null)
+ {
+ ConnectionId connectionId = id.ParentId;
+ if(connectionId != null)
+ {
+ ConnectionState cs = connectionStates[connectionId];
+ if(cs != null)
+ {
+ cs.removeSession(id);
+ }
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public override Response processAddConnection(ConnectionInfo info)
+ {
+ if(info != null)
+ {
+ connectionStates.Add(info.ConnectionId, new ConnectionState(info));
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public override Response processRemoveConnection(ConnectionId id)
+ {
+ if(id != null)
+ {
+ connectionStates.Remove(id);
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+
+ public override Response processMessage(Message send)
+ {
+ if(send != null)
+ {
+ if(TrackTransactions && send.TransactionId != null)
+ {
+ ConnectionId connectionId = send.ProducerId.ParentId.ParentId;
+ if(connectionId != null)
+ {
+ ConnectionState cs = connectionStates[connectionId];
+ if(cs != null)
+ {
+ TransactionState transactionState = cs[send.TransactionId];
+ if(transactionState != null)
+ {
+ transactionState.addCommand(send);
+ }
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+ else if(TrackMessages)
+ {
+ messageCache.Add(send.MessageId, (Message) send.Clone());
+ RemoveEldestInCache();
+ }
+ }
+ return null;
+ }
+
+ public override Response processMessageAck(MessageAck ack)
+ {
+ if(TrackTransactions && ack != null && ack.TransactionId != null)
+ {
+ ConnectionId connectionId = ack.ConsumerId.ParentId.ParentId;
+ if(connectionId != null)
+ {
+ ConnectionState cs = connectionStates[connectionId];
+ if(cs != null)
+ {
+ TransactionState transactionState = cs[ack.TransactionId];
+ if(transactionState != null)
+ {
+ transactionState.addCommand(ack);
+ }
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+ return null;
+ }
+
+ public override Response processBeginTransaction(TransactionInfo info)
+ {
+ if(TrackTransactions && info != null && info.TransactionId != null)
+ {
+ ConnectionId connectionId = info.ConnectionId;
+ if(connectionId != null)
+ {
+ ConnectionState cs = connectionStates[connectionId];
+ if(cs != null)
+ {
+ cs.addTransactionState(info.TransactionId);
+ TransactionState state = cs[info.TransactionId];
+ state.addCommand(info);
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+ return null;
+ }
+
+ public override Response processPrepareTransaction(TransactionInfo info)
+ {
+ if(TrackTransactions && info != null)
+ {
+ ConnectionId connectionId = info.ConnectionId;
+ if(connectionId != null)
+ {
+ ConnectionState cs = connectionStates[connectionId];
+ if(cs != null)
+ {
+ TransactionState transactionState = cs[info.TransactionId];
+ if(transactionState != null)
+ {
+ transactionState.addCommand(info);
+ }
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+ return null;
+ }
+
+ public override Response processCommitTransactionOnePhase(TransactionInfo info)
+ {
+ if(TrackTransactions && info != null)
+ {
+ ConnectionId connectionId = info.ConnectionId;
+ if(connectionId != null)
+ {
+ ConnectionState cs = connectionStates[connectionId];
+ if(cs != null)
+ {
+ TransactionState transactionState = cs[info.TransactionId];
+ if(transactionState != null)
+ {
+ transactionState.addCommand(info);
+ return new Tracked(new RemoveTransactionAction(info, this));
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ public override Response processCommitTransactionTwoPhase(TransactionInfo info)
+ {
+ if(TrackTransactions && info != null)
+ {
+ ConnectionId connectionId = info.ConnectionId;
+ if(connectionId != null)
+ {
+ ConnectionState cs = connectionStates[connectionId];
+ if(cs != null)
+ {
+ TransactionState transactionState = cs[info.TransactionId];
+ if(transactionState != null)
+ {
+ transactionState.addCommand(info);
+ return new Tracked(new RemoveTransactionAction(info, this));
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ public override Response processRollbackTransaction(TransactionInfo info)
+ {
+ if(TrackTransactions && info != null)
+ {
+ ConnectionId connectionId = info.ConnectionId;
+ if(connectionId != null)
+ {
+ ConnectionState cs = connectionStates[connectionId];
+ if(cs != null)
+ {
+ TransactionState transactionState = cs[info.TransactionId];
+ if(transactionState != null)
+ {
+ transactionState.addCommand(info);
+ return new Tracked(new RemoveTransactionAction(info, this));
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ public override Response processEndTransaction(TransactionInfo info)
+ {
+ if(TrackTransactions && info != null)
+ {
+ ConnectionId connectionId = info.ConnectionId;
+ if(connectionId != null)
+ {
+ ConnectionState cs = connectionStates[connectionId];
+ if(cs != null)
+ {
+ TransactionState transactionState = cs[info.TransactionId];
+ if(transactionState != null)
+ {
+ transactionState.addCommand(info);
+ }
+ }
+ }
+ return TRACKED_RESPONSE_MARKER;
+ }
+ return null;
+ }
+
+ public bool RestoreConsumers
+ {
+ get
+ {
+ return _restoreConsumers;
+ }
+ set
+ {
+ _restoreConsumers = value;
+ }
+ }
+
+ public bool RestoreProducers
+ {
+ get
+ {
+ return _restoreProducers;
+ }
+ set
+ {
+ _restoreProducers = value;
+ }
+ }
+
+ public bool RestoreSessions
+ {
+ get
+ {
+ return _restoreSessions;
+ }
+ set
+ {
+ _restoreSessions = value;
+ }
+ }
+
+ public bool TrackTransactions
+ {
+ get
+ {
+ return _trackTransactions;
+ }
+ set
+ {
+ _trackTransactions = value;
+ }
+ }
+
+ public bool RestoreTransaction
+ {
+ get
+ {
+ return _restoreTransaction;
+ }
+ set
+ {
+ _restoreTransaction = value;
+ }
+ }
+
+ public bool TrackMessages
+ {
+ get
+ {
+ return _trackMessages;
+ }
+ set
+ {
+ _trackMessages = value;
+ }
+ }
+
+ public int MaxCacheSize
+ {
+ get
+ {
+ return _maxCacheSize;
+ }
+ set
+ {
+ _maxCacheSize = value;
+ }
+ }
+
+ public void ConnectionInterruptProcessingComplete(ITransport transport, ConnectionId connectionId)
+ {
+ ConnectionState connectionState = connectionStates[connectionId];
+ if(connectionState != null)
+ {
+ connectionState.ConnectionInterruptProcessingComplete = true;
- /// <summary>
- /// </summary>
- /// <param name="transport"></param>
- /// <param name="sessionState"></param>
- protected void DoRestoreProducers(ITransport transport, SessionState sessionState)
- {
- // Restore the session's producers
- foreach(ProducerState producerState in sessionState.ProducerStates)
- {
- transport.Oneway(producerState.Info);
- }
- }
-
- /// <summary>
- /// </summary>
- /// <param name="transport"></param>
- /// <param name="connectionState"></param>
- protected void DoRestoreTempDestinations(ITransport transport, ConnectionState connectionState)
- {
- // Restore the connection's temp destinations.
- foreach(DestinationInfo destinationInfo in connectionState.TempDestinations)
- {
- transport.Oneway(destinationInfo);
- }
- }
-
- public override Response processAddDestination(DestinationInfo info)
- {
- if(info != null)
- {
- ConnectionState cs = connectionStates[info.ConnectionId];
- if(cs != null && info.Destination.IsTemporary)
- {
- cs.addTempDestination(info);
- }
- }
- return TRACKED_RESPONSE_MARKER;
- }
-
- public override Response processRemoveDestination(DestinationInfo info)
- {
- if(info != null)
- {
- ConnectionState cs = connectionStates[info.ConnectionId];
- if(cs != null && info.Destination.IsTemporary)
- {
- cs.removeTempDestination(info.Destination);
- }
- }
- return TRACKED_RESPONSE_MARKER;
- }
-
- public override Response processAddProducer(ProducerInfo info)
- {
- if(info != null && info.ProducerId != null)
- {
- SessionId sessionId = info.ProducerId.ParentId;
- if(sessionId != null)
- {
- ConnectionId connectionId = sessionId.ParentId;
- if(connectionId != null)
- {
- ConnectionState cs = connectionStates[connectionId];
- if(cs != null)
- {
- SessionState ss = cs[sessionId];
- if(ss != null)
- {
- ss.addProducer(info);
- }
- }
- }
- }
- }
- return TRACKED_RESPONSE_MARKER;
- }
-
- public override Response processRemoveProducer(ProducerId id)
- {
- if(id != null)
- {
- SessionId sessionId = id.ParentId;
- if(sessionId != null)
- {
- ConnectionId connectionId = sessionId.ParentId;
- if(connectionId != null)
- {
- ConnectionState cs = connectionStates[connectionId];
- if(cs != null)
- {
- SessionState ss = cs[sessionId];
- if(ss != null)
- {
- ss.removeProducer(id);
- }
- }
- }
- }
- }
- return TRACKED_RESPONSE_MARKER;
- }
-
- public override Response processAddConsumer(ConsumerInfo info)
- {
- if(info != null)
- {
- SessionId sessionId = info.ConsumerId.ParentId;
- if(sessionId != null)
- {
- ConnectionId connectionId = sessionId.ParentId;
- if(connectionId != null)
- {
- ConnectionState cs = connectionStates[connectionId];
- if(cs != null)
- {
- SessionState ss = cs[sessionId];
- if(ss != null)
- {
- ss.addConsumer(info);
- }
- }
- }
- }
- }
- return TRACKED_RESPONSE_MARKER;
- }
-
- public override Response processRemoveConsumer(ConsumerId id)
- {
- if(id != null)
- {
- SessionId sessionId = id.ParentId;
- if(sessionId != null)
- {
- ConnectionId connectionId = sessionId.ParentId;
- if(connectionId != null)
- {
- ConnectionState cs = connectionStates[connectionId];
- if(cs != null)
- {
- SessionState ss = cs[sessionId];
- if(ss != null)
- {
- ss.removeConsumer(id);
- }
- }
- }
- }
- }
- return TRACKED_RESPONSE_MARKER;
- }
-
- public override Response processAddSession(SessionInfo info)
- {
- if(info != null)
- {
- ConnectionId connectionId = info.SessionId.ParentId;
- if(connectionId != null)
- {
- ConnectionState cs = connectionStates[connectionId];
- if(cs != null)
- {
- cs.addSession(info);
- }
- }
- }
- return TRACKED_RESPONSE_MARKER;
- }
-
- public override Response processRemoveSession(SessionId id)
- {
- if(id != null)
- {
- ConnectionId connectionId = id.ParentId;
- if(connectionId != null)
- {
- ConnectionState cs = connectionStates[connectionId];
- if(cs != null)
- {
- cs.removeSession(id);
- }
- }
- }
- return TRACKED_RESPONSE_MARKER;
- }
-
- public override Response processAddConnection(ConnectionInfo info)
- {
- if(info != null)
- {
- connectionStates.Add(info.ConnectionId, new ConnectionState(info));
- }
- return TRACKED_RESPONSE_MARKER;
- }
-
- public override Response processRemoveConnection(ConnectionId id)
- {
- if(id != null)
- {
- connectionStates.Remove(id);
- }
- return TRACKED_RESPONSE_MARKER;
- }
-
- public override Response processMessage(Message send)
- {
- if(send != null)
- {
- if(TrackTransactions && send.TransactionId != null)
- {
- ConnectionId connectionId = send.ProducerId.ParentId.ParentId;
- if(connectionId != null)
- {
- ConnectionState cs = connectionStates[connectionId];
- if(cs != null)
- {
- TransactionState transactionState = cs[send.TransactionId];
- if(transactionState != null)
- {
- transactionState.addCommand(send);
- }
- }
- }
- return TRACKED_RESPONSE_MARKER;
- }
- else if(TrackMessages)
- {
- messageCache.Add(send.MessageId, (Message) send.Clone());
- RemoveEldestInCache();
- }
- }
- return null;
- }
-
- public override Response processMessageAck(MessageAck ack)
- {
- if(TrackTransactions && ack != null && ack.TransactionId != null)
- {
- ConnectionId connectionId = ack.ConsumerId.ParentId.ParentId;
- if(connectionId != null)
- {
- ConnectionState cs = connectionStates[connectionId];
- if(cs != null)
- {
- TransactionState transactionState = cs[ack.TransactionId];
- if(transactionState != null)
- {
- transactionState.addCommand(ack);
- }
- }
- }
- return TRACKED_RESPONSE_MARKER;
- }
- return null;
- }
-
- public override Response processBeginTransaction(TransactionInfo info)
- {
- if(TrackTransactions && info != null && info.TransactionId != null)
- {
- ConnectionId connectionId = info.ConnectionId;
- if(connectionId != null)
- {
- ConnectionState cs = connectionStates[connectionId];
- if(cs != null)
- {
- cs.addTransactionState(info.TransactionId);
- TransactionState state = cs[info.TransactionId];
- state.addCommand(info);
- }
- }
- return TRACKED_RESPONSE_MARKER;
- }
- return null;
- }
-
- public override Response processPrepareTransaction(TransactionInfo info)
- {
- if(TrackTransactions && info != null)
- {
- ConnectionId connectionId = info.ConnectionId;
- if(connectionId != null)
- {
- ConnectionState cs = connectionStates[connectionId];
- if(cs != null)
- {
- TransactionState transactionState = cs[info.TransactionId];
- if(transactionState != null)
- {
- transactionState.addCommand(info);
- }
- }
- }
- return TRACKED_RESPONSE_MARKER;
- }
- return null;
- }
-
- public override Response processCommitTransactionOnePhase(TransactionInfo info)
- {
- if(TrackTransactions && info != null)
- {
- ConnectionId connectionId = info.ConnectionId;
- if(connectionId != null)
- {
- ConnectionState cs = connectionStates[connectionId];
- if(cs != null)
- {
- TransactionState transactionState = cs[info.TransactionId];
- if(transactionState != null)
- {
- transactionState.addCommand(info);
- return new Tracked(new RemoveTransactionAction(info, this));
- }
- }
- }
- }
- return null;
- }
-
- public override Response processCommitTransactionTwoPhase(TransactionInfo info)
- {
- if(TrackTransactions && info != null)
- {
- ConnectionId connectionId = info.ConnectionId;
- if(connectionId != null)
- {
- ConnectionState cs = connectionStates[connectionId];
- if(cs != null)
- {
- TransactionState transactionState = cs[info.TransactionId];
- if(transactionState != null)
- {
- transactionState.addCommand(info);
- return new Tracked(new RemoveTransactionAction(info, this));
- }
- }
- }
- }
- return null;
- }
-
- public override Response processRollbackTransaction(TransactionInfo info)
- {
- if(TrackTransactions && info != null)
- {
- ConnectionId connectionId = info.ConnectionId;
- if(connectionId != null)
- {
- ConnectionState cs = connectionStates[connectionId];
- if(cs != null)
- {
- TransactionState transactionState = cs[info.TransactionId];
- if(transactionState != null)
- {
- transactionState.addCommand(info);
- return new Tracked(new RemoveTransactionAction(info, this));
- }
- }
- }
- }
- return null;
- }
-
- public override Response processEndTransaction(TransactionInfo info)
- {
- if(TrackTransactions && info != null)
- {
- ConnectionId connectionId = info.ConnectionId;
- if(connectionId != null)
- {
- ConnectionState cs = connectionStates[connectionId];
- if(cs != null)
- {
- TransactionState transactionState = cs[info.TransactionId];
- if(transactionState != null)
- {
- transactionState.addCommand(info);
- }
- }
- }
- return TRACKED_RESPONSE_MARKER;
- }
- return null;
- }
-
- public bool RestoreConsumers
- {
- get
- {
- return _restoreConsumers;
- }
- set
- {
- _restoreConsumers = value;
- }
- }
-
- public bool RestoreProducers
- {
- get
- {
- return _restoreProducers;
- }
- set
- {
- _restoreProducers = value;
- }
- }
-
- public bool RestoreSessions
- {
- get
- {
- return _restoreSessions;
- }
- set
- {
- _restoreSessions = value;
- }
- }
-
- public bool TrackTransactions
- {
- get
- {
- return _trackTransactions;
- }
- set
- {
- _trackTransactions = value;
- }
- }
-
- public bool RestoreTransaction
- {
- get
- {
- return _restoreTransaction;
- }
- set
- {
- _restoreTransaction = value;
- }
- }
-
- public bool TrackMessages
- {
- get
- {
- return _trackMessages;
- }
- set
- {
- _trackMessages = value;
- }
- }
-
- public int MaxCacheSize
- {
- get
- {
- return _maxCacheSize;
- }
- set
- {
- _maxCacheSize = value;
- }
- }
-
- public void ConnectionInterruptProcessingComplete(ITransport transport, ConnectionId connectionId)
- {
- ConnectionState connectionState = connectionStates[connectionId];
- if(connectionState != null)
- {
- connectionState.ConnectionInterruptProcessingComplete = true;
-
- Dictionary<ConsumerId, ConsumerInfo> stalledConsumers = connectionState.RecoveringPullConsumers;
- foreach(KeyValuePair<ConsumerId, ConsumerInfo> entry in stalledConsumers)
- {
- ConsumerControl control = new ConsumerControl();
- control.ConsumerId = entry.Key;
- control.Prefetch = entry.Value.PrefetchSize;
- control.Destination = entry.Value.Destination;
- try
- {
- if(Tracer.IsDebugEnabled)
- {
- Tracer.Debug("restored recovering consumer: " + control.ConsumerId +
- " with: " + control.Prefetch);
- }
- transport.Oneway(control);
- }
- catch(Exception ex)
- {
- if(Tracer.IsDebugEnabled)
- {
- Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId +
- " with: " + control.Prefetch + "Error: " + ex.Message);
- }
- }
- }
- stalledConsumers.Clear();
- }
- }
-
- public void TransportInterrupted()
- {
- foreach(ConnectionState connectionState in connectionStates.Values)
- {
- connectionState.ConnectionInterruptProcessingComplete = false;
- }
- }
- }
+ Dictionary<ConsumerId, ConsumerInfo> stalledConsumers = connectionState.RecoveringPullConsumers;
+ foreach(KeyValuePair<ConsumerId, ConsumerInfo> entry in stalledConsumers)
+ {
+ ConsumerControl control = new ConsumerControl();
+ control.ConsumerId = entry.Key;
+ control.Prefetch = entry.Value.PrefetchSize;
+ control.Destination = entry.Value.Destination;
+ try
+ {
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("restored recovering consumer: " + control.ConsumerId +
+ " with: " + control.Prefetch);
+ }
+ transport.Oneway(control);
+ }
+ catch(Exception ex)
+ {
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId +
+ " with: " + control.Prefetch + "Error: " + ex.Message);
+ }
+ }
+ }
+ stalledConsumers.Clear();
+ }
+ }
+
+ public void TransportInterrupted()
+ {
+ foreach(ConnectionState connectionState in connectionStates.Values)
+ {
+ connectionState.ConnectionInterruptProcessingComplete = false;
+ }
+ }
+ }
}