You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/01/20 20:54:22 UTC
svn commit: r1061482 - in
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp:
Connection.cs NetTxSession.cs TransactionContext.cs
Author: tabish
Date: Thu Jan 20 19:54:22 2011
New Revision: 1061482
URL: http://svn.apache.org/viewvc?rev=1061482&view=rev
Log:
https://issues.apache.org/jira/browse/AMQNET-290
Several changes needed for handling transaction recovery after a distributed transaction fails in the commit or rollback phase.
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=1061482&r1=1061481&r2=1061482&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Thu Jan 20 19:54:22 2011
@@ -16,6 +16,7 @@
*/
using System;
+using System.Diagnostics;
using System.Collections;
using System.Threading;
using Apache.NMS.ActiveMQ.Commands;
@@ -73,6 +74,7 @@ namespace Apache.NMS.ActiveMQ
private ICompressionPolicy compressionPolicy = new CompressionPolicy();
private readonly IdGenerator clientIdGenerator;
private volatile CountDownLatch transportInterruptionProcessingComplete;
+ private volatile CountDownLatch asyncExceptionHandlerComplete;
private readonly MessageTransformation messageTransformation;
public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
@@ -524,8 +526,16 @@ namespace Apache.NMS.ActiveMQ
try
{
- Tracer.Info("Closing Connection.");
+ Tracer.Info("Connection.Close(): Closing Connection Now.");
this.closing.Value = true;
+
+ // Wait for an async exception event to complete
+ CountDownLatch latch = this.asyncExceptionHandlerComplete;
+ if (latch != null)
+ {
+ latch.await();
+ }
+
lock(sessions.SyncRoot)
{
foreach(Session session in sessions)
@@ -863,10 +873,13 @@ namespace Apache.NMS.ActiveMQ
internal void OnException(Exception error)
{
+ // Will fire an exception listener callback if there's any set.
OnAsyncException(error);
if(!this.closing.Value && !this.closed.Value)
{
+ this.asyncExceptionHandlerComplete = new CountDownLatch(1);
+
// Perform the actual work in another thread to avoid lock contention
// and allow the caller to continue on in its error cleanup.
ThreadPool.QueueUserWorkItem(AsyncOnExceptionHandler, error);
@@ -909,6 +922,8 @@ namespace Apache.NMS.ActiveMQ
Tracer.Debug("Caught Exception While disposing of Sessions: " + ex);
}
}
+
+ this.asyncExceptionHandlerComplete.countDown();
}
private void MarkTransportFailed(Exception error)
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs?rev=1061482&r1=1061481&r2=1061482&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs Thu Jan 20 19:54:22 2011
@@ -27,6 +27,7 @@ namespace Apache.NMS.ActiveMQ
public NetTxSession(Connection connection, SessionId id)
: base(connection, id, AcknowledgementMode.AutoAcknowledge)
{
+ TransactionContext.CheckForAndRecoverFailedTransactions();
}
/// <summary>
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs?rev=1061482&r1=1061481&r2=1061482&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs Thu Jan 20 19:54:22 2011
@@ -16,10 +16,14 @@
*/
using System;
+using System.Runtime.Serialization;
+using System.Runtime.Serialization.Formatters.Binary;
+using System.IO;
using System.Text;
using System.Net;
using System.Transactions;
using System.Collections;
+using System.Collections.Generic;
using Apache.NMS.ActiveMQ.Commands;
namespace Apache.NMS.ActiveMQ
@@ -210,7 +214,7 @@ namespace Apache.NMS.ActiveMQ
throw new TransactionInProgressException("A Transaction is already in Progress");
}
- Guid rmId = GuidFromId(this.connection.ResourceManagerId);
+ Guid rmId = ResourceManagerGuid;
// Enlist this object in the transaction.
this.currentEnlistment =
@@ -270,15 +274,15 @@ namespace Apache.NMS.ActiveMQ
IntegerResponse response = (IntegerResponse) this.connection.SyncRequest(info);
if(response.Result == XA_READONLY)
{
- Tracer.Debug("Transaction Prepare Reports Done: ");
+ Tracer.Debug("Transaction Prepare Reports Done with no need to Commit: ");
+
+ this.transactionId = null;
+ this.currentEnlistment = null;
// if server responds that nothing needs to be done, then reply prepared
// but clear the current state data so we appear done to the commit method.
preparingEnlistment.Prepared();
- this.transactionId = null;
- this.currentEnlistment = null;
-
// Done so commit won't be called.
AfterCommit();
}
@@ -288,6 +292,8 @@ namespace Apache.NMS.ActiveMQ
// If work finished correctly, reply prepared
preparingEnlistment.Prepared();
+
+ StoreRecoveryInformation(preparingEnlistment.RecoveryInformation());
}
}
catch(Exception ex)
@@ -295,6 +301,7 @@ namespace Apache.NMS.ActiveMQ
Tracer.Debug("Transaction Prepare failed with error: " + ex.Message);
AfterRollback();
preparingEnlistment.ForceRollback();
+ ClearStoredRecoveryInformation();
}
}
@@ -317,6 +324,8 @@ namespace Apache.NMS.ActiveMQ
Tracer.Debug("Transaction Commit Reports Done: ");
+ ClearStoredRecoveryInformation();
+
// if server responds that nothing needs to be done, then reply done.
enlistment.Done();
@@ -327,8 +336,14 @@ namespace Apache.NMS.ActiveMQ
{
Tracer.Debug("Transaction Commit failed with error: " + ex.Message);
AfterRollback();
- enlistment.Done();
- this.session.Connection.OnException(ex);
+ try
+ {
+ this.session.Connection.OnException(ex);
+ }
+ catch (Exception error)
+ {
+ Tracer.Error(error.ToString());
+ }
}
finally
{
@@ -401,6 +416,8 @@ namespace Apache.NMS.ActiveMQ
Tracer.Debug("Transaction Rollback Reports Done: ");
+ ClearStoredRecoveryInformation();
+
// if server responds that nothing needs to be done, then reply done.
enlistment.Done();
@@ -410,7 +427,6 @@ namespace Apache.NMS.ActiveMQ
{
Tracer.Debug("Transaction Rollback failed with error: " + ex.Message);
AfterRollback();
- enlistment.Done();
this.session.Connection.OnException(ex);
}
finally
@@ -443,6 +459,8 @@ namespace Apache.NMS.ActiveMQ
Tracer.Debug("InDoubt Transaction Rollback Reports Done: ");
+ ClearStoredRecoveryInformation();
+
// if server responds that nothing needs to be done, then reply done.
enlistment.Done();
@@ -455,31 +473,233 @@ namespace Apache.NMS.ActiveMQ
}
}
- public XATransactionId[] Recover()
+ #endregion
+
+ #region Distributed Transaction Recovery Bits
+
+ private object logFileLock = new object();
+
+ /// <summary>
+ /// Should be called from NetTxSession when created to check if any TX
+ /// data is stored for recovery and whether the Broker has matching info
+ /// stored. If an Transaction is found that belongs to this client and is
+ /// still alive on the Broker it will be recovered, otherwise the stored
+ /// data should be cleared.
+ /// </summary>
+ public void CheckForAndRecoverFailedTransactions()
+ {
+ RecoveryInformation info = TryOpenRecoveryInfoFile();
+ if (info == null)
+ {
+ Tracer.Debug("Did not detect any open DTC transaction records on disk.");
+ // No local data so anything stored on the broker can't be recovered here.
+ return;
+ }
+
+ XATransactionId[] recoverables = TryRecoverBrokerTXIds();
+ if (recoverables.Length == 0)
+ {
+ Tracer.Debug("Did not detect any recoverable transactions at Broker.");
+ // Broker has no recoverable data so nothing to do here, delete the
+ // old recovery log as its stale.
+ ClearStoredRecoveryInformation();
+ return;
+ }
+
+ XATransactionId xid = info.Xid;
+
+ foreach(XATransactionId recoverable in recoverables)
+ {
+ if(xid.Equals(recoverable))
+ {
+ Tracer.DebugFormat("Found a matching TX on Broker to stored Id: {0} reenlisting.", xid);
+
+ // Reenlist the recovered transaction with the TX Manager.
+ this.transactionId = xid;
+ this.currentEnlistment = TransactionManager.Reenlist(ResourceManagerGuid, info.TxRecoveryInfo, this);
+ TransactionManager.RecoveryComplete(ResourceManagerGuid);
+ return;
+ }
+ }
+
+ // The old recovery information doesn't match what's on the broker so we
+ // should discard it as its stale now.
+ ClearStoredRecoveryInformation();
+ }
+
+ [Serializable]
+ private sealed class RecoveryInformation
+ {
+ private byte[] txRecoveryInfo;
+ private byte[] globalTxId;
+ private byte[] branchId;
+ private int formatId;
+
+ public RecoveryInformation()
+ {
+ }
+
+ public RecoveryInformation(XATransactionId xaId, byte[] recoveryInfo)
+ {
+ this.Xid = xaId;
+ this.txRecoveryInfo = recoveryInfo;
+ }
+
+ public byte[] TxRecoveryInfo
+ {
+ get { return this.txRecoveryInfo; }
+ }
+
+ public XATransactionId Xid
+ {
+ get
+ {
+ XATransactionId xid = new XATransactionId();
+ xid.BranchQualifier = this.branchId;
+ xid.GlobalTransactionId = this.globalTxId;
+ xid.FormatId = this.formatId;
+
+ return xid;
+ }
+
+ set
+ {
+ this.branchId = value.BranchQualifier;
+ this.globalTxId = value.GlobalTransactionId;
+ this.formatId = value.FormatId;
+ }
+ }
+ }
+
+ private RecoveryInformation TryOpenRecoveryInfoFile()
+ {
+ string filename = ResourceManagerId + ".bin";
+ RecoveryInformation result = null;
+
+ Tracer.Debug("Checking for Recoverable Transactions filename: " + filename);
+
+ lock (logFileLock)
+ {
+ try
+ {
+ if (!File.Exists(filename))
+ {
+ return null;
+ }
+
+ using(FileStream recoveryLog = new FileStream(filename, FileMode.Open, FileAccess.Read))
+ {
+ Tracer.Debug("Found Recovery Log File: " + filename);
+ IFormatter formatter = new BinaryFormatter();
+ result = formatter.Deserialize(recoveryLog) as RecoveryInformation;
+ }
+ }
+ catch(Exception ex)
+ {
+ Tracer.InfoFormat("Error while opening Recovery file {0} error message: {1}", filename, ex.Message);
+ // Nothing to restore.
+ return null;
+ }
+ }
+
+ return result;
+ }
+
+ private XATransactionId[] TryRecoverBrokerTXIds()
{
+ Tracer.Debug("Checking for Recoverable Transactions on Broker.");
+
TransactionInfo info = new TransactionInfo();
info.ConnectionId = this.session.Connection.ConnectionId;
info.Type = (int)TransactionType.Recover;
this.connection.CheckConnected();
DataArrayResponse response = this.connection.SyncRequest(info) as DataArrayResponse;
-
- if(response != null && response.Data.Length > 0)
+
+ if (response != null && response.Data.Length > 0)
{
- XATransactionId[] result = null;
- if (response.Data is XATransactionId[])
+ Tracer.DebugFormat("Broker reports there are {0} recoverable XA Transactions", response.Data.Length);
+
+ List<XATransactionId> recovered = new List<XATransactionId>();
+
+ foreach (DataStructure ds in response.Data)
{
- result = new XATransactionId[response.Data.Length];
- System.Array.Copy(response.Data, result, response.Data.Length);
- return result;
+ XATransactionId xid = ds as XATransactionId;
+ if (xid != null)
+ {
+ recovered.Add(xid);
+ }
}
+
+ return recovered.ToArray();
}
return new XATransactionId[0];
}
+ private void StoreRecoveryInformation(byte[] recoveryInfo)
+ {
+ if (recoveryInfo == null || recoveryInfo.Length == 0)
+ {
+ return;
+ }
+
+ try
+ {
+ lock (logFileLock)
+ {
+ string filename = ResourceManagerId + ".bin";
+ XATransactionId xid = this.transactionId as XATransactionId;
+
+ RecoveryInformation info = new RecoveryInformation(xid, recoveryInfo);
+
+ Tracer.Debug("Serializing Recovery Info to file: " + filename);
+
+ IFormatter formatter = new BinaryFormatter();
+ using (FileStream recoveryLog = new FileStream(filename, FileMode.OpenOrCreate, FileAccess.Write))
+ {
+ formatter.Serialize(recoveryLog, info);
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ Tracer.Error("Error while storing TX Recovery Info, message: " + ex.Message);
+ throw;
+ }
+ }
+
+ private void ClearStoredRecoveryInformation()
+ {
+ lock (logFileLock)
+ {
+ string filename = ResourceManagerId + ".bin";
+
+ try
+ {
+ Tracer.Debug("Attempting to remove stale Recovery Info file: " + filename);
+ File.Delete(filename);
+ }
+ catch(Exception ex)
+ {
+ Tracer.Debug("Caught Exception while removing stale RecoveryInfo file: " + ex.Message);
+ return;
+ }
+ }
+ }
+
#endregion
+ public string ResourceManagerId
+ {
+ get { return GuidFromId(this.connection.ResourceManagerId).ToString(); }
+ }
+
+ internal Guid ResourceManagerGuid
+ {
+ get { return GuidFromId(this.connection.ResourceManagerId); }
+ }
+
private Guid GuidFromId(string id)
{
// Remove the ID: prefix, that's non-unique to be sure