You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/01/20 20:54:22 UTC

svn commit: r1061482 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp: Connection.cs NetTxSession.cs TransactionContext.cs

Author: tabish
Date: Thu Jan 20 19:54:22 2011
New Revision: 1061482

URL: http://svn.apache.org/viewvc?rev=1061482&view=rev
Log:
https://issues.apache.org/jira/browse/AMQNET-290

Several changes needed for handling transaction recovery after a distributed transaction fails in the commit or rollback phase.

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=1061482&r1=1061481&r2=1061482&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Thu Jan 20 19:54:22 2011
@@ -16,6 +16,7 @@
  */
 
 using System;
+using System.Diagnostics;
 using System.Collections;
 using System.Threading;
 using Apache.NMS.ActiveMQ.Commands;
@@ -73,6 +74,7 @@ namespace Apache.NMS.ActiveMQ
         private ICompressionPolicy compressionPolicy = new CompressionPolicy();
         private readonly IdGenerator clientIdGenerator;
         private volatile CountDownLatch transportInterruptionProcessingComplete;
+        private volatile CountDownLatch asyncExceptionHandlerComplete;
         private readonly MessageTransformation messageTransformation;
 
         public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
@@ -524,8 +526,16 @@ namespace Apache.NMS.ActiveMQ
 
                 try
                 {
-                    Tracer.Info("Closing Connection.");
+                    Tracer.Info("Connection.Close(): Closing Connection Now.");
                     this.closing.Value = true;
+
+                    // Wait for an async exception event to complete
+                    CountDownLatch latch = this.asyncExceptionHandlerComplete;
+                    if (latch != null)
+                    {
+                        latch.await();
+                    }
+
                     lock(sessions.SyncRoot)
                     {
                         foreach(Session session in sessions)
@@ -863,10 +873,13 @@ namespace Apache.NMS.ActiveMQ
 
         internal void OnException(Exception error)
         {
+            // Will fire an exception listener callback if there's any set.
             OnAsyncException(error);
 
             if(!this.closing.Value && !this.closed.Value)
             {
+                this.asyncExceptionHandlerComplete = new CountDownLatch(1);
+
                 // Perform the actual work in another thread to avoid lock contention
                 // and allow the caller to continue on in its error cleanup.
                 ThreadPool.QueueUserWorkItem(AsyncOnExceptionHandler, error);
@@ -909,6 +922,8 @@ namespace Apache.NMS.ActiveMQ
                     Tracer.Debug("Caught Exception While disposing of Sessions: " + ex);
                 }
             }
+
+            this.asyncExceptionHandlerComplete.countDown();
         }
 
         private void MarkTransportFailed(Exception error)

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs?rev=1061482&r1=1061481&r2=1061482&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs Thu Jan 20 19:54:22 2011
@@ -27,6 +27,7 @@ namespace Apache.NMS.ActiveMQ
         public NetTxSession(Connection connection, SessionId id)
             : base(connection, id, AcknowledgementMode.AutoAcknowledge)
         {
+            TransactionContext.CheckForAndRecoverFailedTransactions();
         }
 
         /// <summary>

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs?rev=1061482&r1=1061481&r2=1061482&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs Thu Jan 20 19:54:22 2011
@@ -16,10 +16,14 @@
  */
 
 using System;
+using System.Runtime.Serialization;
+using System.Runtime.Serialization.Formatters.Binary;
+using System.IO;
 using System.Text;
 using System.Net;
 using System.Transactions;
 using System.Collections;
+using System.Collections.Generic;
 using Apache.NMS.ActiveMQ.Commands;
 
 namespace Apache.NMS.ActiveMQ
@@ -210,7 +214,7 @@ namespace Apache.NMS.ActiveMQ
                 throw new TransactionInProgressException("A Transaction is already in Progress");
             }
 
-            Guid rmId = GuidFromId(this.connection.ResourceManagerId);
+            Guid rmId = ResourceManagerGuid;
 
             // Enlist this object in the transaction.
             this.currentEnlistment =
@@ -270,15 +274,15 @@ namespace Apache.NMS.ActiveMQ
                 IntegerResponse response = (IntegerResponse) this.connection.SyncRequest(info);
                 if(response.Result == XA_READONLY)
                 {
-                    Tracer.Debug("Transaction Prepare Reports Done: ");
+                    Tracer.Debug("Transaction Prepare Reports Done with no need to Commit: ");
+
+                    this.transactionId = null;
+                    this.currentEnlistment = null;
 
                     // if server responds that nothing needs to be done, then reply prepared
                     // but clear the current state data so we appear done to the commit method.
                     preparingEnlistment.Prepared();
 
-                    this.transactionId = null;
-                    this.currentEnlistment = null;
-
                     // Done so commit won't be called.
                     AfterCommit();
                 }
@@ -288,6 +292,8 @@ namespace Apache.NMS.ActiveMQ
 
                     // If work finished correctly, reply prepared
                     preparingEnlistment.Prepared();
+
+                    StoreRecoveryInformation(preparingEnlistment.RecoveryInformation());
                 }
             }
             catch(Exception ex)
@@ -295,6 +301,7 @@ namespace Apache.NMS.ActiveMQ
                 Tracer.Debug("Transaction Prepare failed with error: " + ex.Message);
                 AfterRollback();
                 preparingEnlistment.ForceRollback();
+                ClearStoredRecoveryInformation();
             }
         }
 
@@ -317,6 +324,8 @@ namespace Apache.NMS.ActiveMQ
 
                     Tracer.Debug("Transaction Commit Reports Done: ");
 
+                    ClearStoredRecoveryInformation();
+
                     // if server responds that nothing needs to be done, then reply done.
                     enlistment.Done();
 
@@ -327,8 +336,14 @@ namespace Apache.NMS.ActiveMQ
             {
                 Tracer.Debug("Transaction Commit failed with error: " + ex.Message);
                 AfterRollback();
-                enlistment.Done();
-                this.session.Connection.OnException(ex);
+                try
+                {
+                    this.session.Connection.OnException(ex);
+                }
+                catch (Exception error)
+                {
+                    Tracer.Error(error.ToString());
+                }
             }
             finally
             {
@@ -401,6 +416,8 @@ namespace Apache.NMS.ActiveMQ
 
                 Tracer.Debug("Transaction Rollback Reports Done: ");
 
+                ClearStoredRecoveryInformation();
+
                 // if server responds that nothing needs to be done, then reply done.
                 enlistment.Done();
 
@@ -410,7 +427,6 @@ namespace Apache.NMS.ActiveMQ
             {
                 Tracer.Debug("Transaction Rollback failed with error: " + ex.Message);
                 AfterRollback();
-                enlistment.Done();
                 this.session.Connection.OnException(ex);
             }
             finally
@@ -443,6 +459,8 @@ namespace Apache.NMS.ActiveMQ
 
                 Tracer.Debug("InDoubt Transaction Rollback Reports Done: ");
 
+                ClearStoredRecoveryInformation();
+
                 // if server responds that nothing needs to be done, then reply done.
                 enlistment.Done();
 
@@ -455,31 +473,233 @@ namespace Apache.NMS.ActiveMQ
             }
         }
 
-        public XATransactionId[] Recover()
+        #endregion
+
+        #region Distributed Transaction Recovery Bits
+
+        private object logFileLock = new object();
+
+        /// <summary>
+        /// Should be called from NetTxSession when created to check if any TX
+        /// data is stored for recovery and whether the Broker has matching info
+        /// stored.  If an Transaction is found that belongs to this client and is
+        /// still alive on the Broker it will be recovered, otherwise the stored 
+        /// data should be cleared.
+        /// </summary>
+        public void CheckForAndRecoverFailedTransactions()
+        {
+            RecoveryInformation info = TryOpenRecoveryInfoFile();
+            if (info == null)
+            {
+                Tracer.Debug("Did not detect any open DTC transaction records on disk.");
+                // No local data so anything stored on the broker can't be recovered here.
+                return;
+            }
+
+            XATransactionId[] recoverables = TryRecoverBrokerTXIds();
+            if (recoverables.Length == 0)
+            {
+                Tracer.Debug("Did not detect any recoverable transactions at Broker.");
+                // Broker has no recoverable data so nothing to do here, delete the 
+                // old recovery log as its stale.
+                ClearStoredRecoveryInformation();
+                return;
+            }
+
+            XATransactionId xid = info.Xid;
+
+            foreach(XATransactionId recoverable in recoverables)
+            {
+                if(xid.Equals(recoverable))
+                {
+                    Tracer.DebugFormat("Found a matching TX on Broker to stored Id: {0} reenlisting.", xid);
+
+                    // Reenlist the recovered transaction with the TX Manager.
+                    this.transactionId = xid;
+                    this.currentEnlistment = TransactionManager.Reenlist(ResourceManagerGuid, info.TxRecoveryInfo, this);
+                    TransactionManager.RecoveryComplete(ResourceManagerGuid);
+                    return;
+                }
+            }
+
+            // The old recovery information doesn't match what's on the broker so we
+            // should discard it as its stale now.
+            ClearStoredRecoveryInformation();
+        }
+
+        [Serializable]
+        private sealed class RecoveryInformation
+        {
+            private byte[] txRecoveryInfo;
+            private byte[] globalTxId;
+            private byte[] branchId;
+            private int formatId;
+
+            public RecoveryInformation()
+            {
+            }
+
+            public RecoveryInformation(XATransactionId xaId, byte[] recoveryInfo)
+            {
+                this.Xid = xaId;
+                this.txRecoveryInfo = recoveryInfo;
+            }
+
+            public byte[] TxRecoveryInfo
+            {
+                get { return this.txRecoveryInfo; }
+            }
+
+            public XATransactionId Xid
+            {
+                get
+                {
+                    XATransactionId xid = new XATransactionId();
+                    xid.BranchQualifier = this.branchId;
+                    xid.GlobalTransactionId = this.globalTxId;
+                    xid.FormatId = this.formatId;
+
+                    return xid;
+                }
+
+                set
+                {
+                    this.branchId = value.BranchQualifier;
+                    this.globalTxId = value.GlobalTransactionId;
+                    this.formatId = value.FormatId;
+                }
+            }
+        }
+
+        private RecoveryInformation TryOpenRecoveryInfoFile()
+        {
+            string filename = ResourceManagerId + ".bin";
+            RecoveryInformation result = null;
+
+            Tracer.Debug("Checking for Recoverable Transactions filename: " + filename);
+
+            lock (logFileLock)
+            {
+                try
+                {
+                    if (!File.Exists(filename))
+                    {
+                        return null;
+                    }
+
+                    using(FileStream recoveryLog = new FileStream(filename, FileMode.Open, FileAccess.Read))
+                    {
+                        Tracer.Debug("Found Recovery Log File: " + filename);
+                        IFormatter formatter = new BinaryFormatter();
+                        result = formatter.Deserialize(recoveryLog) as RecoveryInformation;
+                    }
+                }
+                catch(Exception ex)
+                {
+                    Tracer.InfoFormat("Error while opening Recovery file {0} error message: {1}", filename, ex.Message);
+                    // Nothing to restore.
+                    return null;
+                }
+            }
+
+            return result;
+        }
+
+        private XATransactionId[] TryRecoverBrokerTXIds()
         {
+            Tracer.Debug("Checking for Recoverable Transactions on Broker.");
+
             TransactionInfo info = new TransactionInfo();
             info.ConnectionId = this.session.Connection.ConnectionId;
             info.Type = (int)TransactionType.Recover;
 
             this.connection.CheckConnected();
             DataArrayResponse response = this.connection.SyncRequest(info) as DataArrayResponse;
-            
-            if(response != null && response.Data.Length > 0)
+
+            if (response != null && response.Data.Length > 0)
             {
-                XATransactionId[] result = null;
-                if (response.Data is XATransactionId[])
+                Tracer.DebugFormat("Broker reports there are {0} recoverable XA Transactions", response.Data.Length);
+
+                List<XATransactionId> recovered = new List<XATransactionId>();
+
+                foreach (DataStructure ds in response.Data)
                 {
-                    result = new XATransactionId[response.Data.Length];
-                    System.Array.Copy(response.Data, result, response.Data.Length);
-                    return result;
+                    XATransactionId xid = ds as XATransactionId;
+                    if (xid != null)
+                    {
+                        recovered.Add(xid);
+                    }
                 }
+
+                return recovered.ToArray();
             }
 
             return new XATransactionId[0];
         }
 
+        private void StoreRecoveryInformation(byte[] recoveryInfo)
+        {
+            if (recoveryInfo == null || recoveryInfo.Length == 0)
+            {
+                return;
+            }
+
+            try
+            {
+                lock (logFileLock)
+                {
+                    string filename = ResourceManagerId + ".bin";
+                    XATransactionId xid = this.transactionId as XATransactionId;
+
+                    RecoveryInformation info = new RecoveryInformation(xid, recoveryInfo);
+
+                    Tracer.Debug("Serializing Recovery Info to file: " + filename);
+
+                    IFormatter formatter = new BinaryFormatter();
+                    using (FileStream recoveryLog = new FileStream(filename, FileMode.OpenOrCreate, FileAccess.Write))
+                    {
+                        formatter.Serialize(recoveryLog, info);
+                    }
+                }
+            }
+            catch (Exception ex)
+            {
+                Tracer.Error("Error while storing TX Recovery Info, message: " + ex.Message);
+                throw;
+            }
+        }
+
+        private void ClearStoredRecoveryInformation()
+        {
+            lock (logFileLock)
+            {
+                string filename = ResourceManagerId + ".bin";
+
+                try
+                {
+                    Tracer.Debug("Attempting to remove stale Recovery Info file: " + filename);
+                    File.Delete(filename);
+                }
+                catch(Exception ex)
+                {
+                    Tracer.Debug("Caught Exception while removing stale RecoveryInfo file: " + ex.Message);
+                    return;
+                }
+            }
+        }
+
         #endregion
 
+        public string ResourceManagerId
+        {
+            get { return GuidFromId(this.connection.ResourceManagerId).ToString(); }
+        }
+
+        internal Guid ResourceManagerGuid
+        {
+            get { return GuidFromId(this.connection.ResourceManagerId); }
+        }
+
         private Guid GuidFromId(string id)
         {
             // Remove the ID: prefix, that's non-unique to be sure