You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/04/22 23:10:36 UTC

svn commit: r1096041 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/MessageConsumer.cs main/csharp/NetTxSession.cs main/csharp/Session.cs main/csharp/TransactionContext.cs test/csharp/DtcConsumerTransactionsTest.cs

Author: tabish
Date: Fri Apr 22 21:10:36 2011
New Revision: 1096041

URL: http://svn.apache.org/viewvc?rev=1096041&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQNET-326

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1096041&r1=1096040&r2=1096041&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Fri Apr 22 21:10:36 2011
@@ -763,13 +763,24 @@ namespace Apache.NMS.ActiveMQ
 
 			if(!IsAutoAcknowledgeBatch)
 			{
-				lock(this.dispatchedMessages)
+                if (this.session.IsTransacted)
+                {
+                    // In the case where the consumer is operating in concert with 
+                    // a distributed TX manager we need to wait whenever the TX
+                    // is controlled by the DTC as it completes all operations
+                    // async and we cannot start consumption again until all its
+                    // tasks have completed.
+                    this.session.TransactionContext.DtcWaitHandle.WaitOne();
+                }
+
+			    lock(this.dispatchedMessages)
 				{
 					this.dispatchedMessages.AddFirst(dispatch);
 				}
 
 				if(this.session.IsTransacted)
 				{
+				    this.session.TransactionContext.DtcWaitHandle.WaitOne();
 					this.AckLater(dispatch, AckType.DeliveredAck);
 				}
 			}
@@ -950,8 +961,11 @@ namespace Apache.NMS.ActiveMQ
 
 				if(this.session.IsTransacted)
 				{
-					this.session.DoStartTransaction();
-					ack.TransactionId = this.session.TransactionContext.TransactionId;
+                    if (!this.session.TransactionContext.InTransaction)
+                    {
+                        this.session.DoStartTransaction();
+                    }
+				    ack.TransactionId = this.session.TransactionContext.TransactionId;
 				}
 
 				this.session.SendAck(ack);

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs?rev=1096041&r1=1096040&r2=1096041&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs Fri Apr 22 21:10:36 2011
@@ -16,6 +16,7 @@
  */
 
 using System;
+using System.Threading;
 using System.Transactions;
 using Apache.NMS.ActiveMQ.Commands;
 
@@ -77,7 +78,18 @@ namespace Apache.NMS.ActiveMQ
 
         internal override void DoStartTransaction()
         {
-            if(!TransactionContext.InNetTransaction && Transaction.Current != null)
+            TransactionContext.SyncRoot.WaitOne();
+
+            if (TransactionContext.InNetTransaction && TransactionContext.NetTxState == TransactionContext.TxState.Pending)
+            {
+                // To late to participate in this TX, we have to wait for it to complete then
+                // we can create a new TX and start from there.
+                TransactionContext.SyncRoot.ReleaseMutex();
+                TransactionContext.DtcWaitHandle.WaitOne();
+                TransactionContext.SyncRoot.WaitOne();
+            }
+
+            if (!TransactionContext.InNetTransaction && Transaction.Current != null)
             {
                 Tracer.Debug("NetTxSession detected Ambient Transaction, start new TX with broker");
 
@@ -90,7 +102,7 @@ namespace Apache.NMS.ActiveMQ
             // If an Async DTC operation is in progress such as Commit or Rollback
             // we need to let it complete before deciding if the Session is in a TX
             // otherwise we might error out for no reason.
-            TransactionContext.DtcWaitHandle.WaitOne();
+            //TransactionContext.DtcWaitHandle.WaitOne();
 
             if(TransactionContext.InNetTransaction)
             {

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=1096041&r1=1096040&r2=1096041&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Fri Apr 22 21:10:36 2011
@@ -300,17 +300,21 @@ namespace Apache.NMS.ActiveMQ
 
             try
             {
-                if(transactionContext.InNetTransaction)
+                TransactionContext.SyncRoot.WaitOne();
+                
+                if (transactionContext.InNetTransaction)
                 {
-                    this.transactionContext.AddSynchronization(new SessionCloseSynchronization(this));
+                    TransactionContext.SyncRoot.ReleaseMutex();
+
+                    //this.transactionContext.AddSynchronization(new SessionCloseSynchronization(this));)
                     this.transactionContext.DtcWaitHandle.WaitOne();
                 }
-                else
-                {
-                    Tracer.InfoFormat("Closing The Session with Id {0}", this.info.SessionId);
-                    DoClose();
-                    Tracer.InfoFormat("Closed The Session with Id {0}", this.info.SessionId);
-                }
+
+                TransactionContext.SyncRoot.ReleaseMutex();
+
+                Tracer.InfoFormat("Closing The Session with Id {0}", this.info.SessionId);
+                DoClose();
+                Tracer.InfoFormat("Closed The Session with Id {0}", this.info.SessionId);
             }
             catch(Exception ex)
             {

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs?rev=1096041&r1=1096040&r2=1096041&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs Fri Apr 22 21:10:36 2011
@@ -199,13 +199,40 @@ namespace Apache.NMS.ActiveMQ
 
         #region Transaction Members used when dealing with .NET System Transactions.
 
+        // When DTC calls prepare we must then wait for either the TX to commit, rollback or
+        // be canceled because its in doubt.
         private readonly ManualResetEvent dtcControlEvent = new ManualResetEvent(true);
 
+        // Once the DTC calls prepare we lock this object and don't unlock it again until
+        // the TX has either completed or terminated, the users of this class should use
+        // this sync point when the TX is a DTC version as opposed to a local one.
+        private readonly Mutex syncObject = new Mutex();
+
+	    public enum TxState
+	    {
+	        None = 0, Active = 1, Pending = 2
+	    }
+
+	    private TxState netTxState = TxState.None;
+
+        public Mutex SyncRoot
+	    {
+            get { return this.syncObject; }
+	    }
+
         public bool InNetTransaction
         {
             get{ return this.transactionId != null && this.transactionId is XATransactionId; }
         }
 
+        public TxState NetTxState
+        {
+            get
+            {
+                return this.netTxState; 
+            }
+        }
+
 	    public WaitHandle DtcWaitHandle
 	    {
             get { return dtcControlEvent; }
@@ -213,348 +240,375 @@ namespace Apache.NMS.ActiveMQ
 
         public void Begin(Transaction transaction)
         {
-            Tracer.Debug("Begin notification received");
-
-            if(InNetTransaction)
+            lock (syncObject)
             {
-                throw new TransactionInProgressException("A Transaction is already in Progress");
-            }
+                this.netTxState = TxState.Active;
 
-            dtcControlEvent.Reset();
+                Tracer.Debug("Begin notification received");
 
-            try
-            {
-                Guid rmId = ResourceManagerGuid;
+                if (InNetTransaction)
+                {
+                    throw new TransactionInProgressException("A Transaction is already in Progress");
+                }
 
-                // Enlist this object in the transaction.
-                this.currentEnlistment =
-                    transaction.EnlistDurable(rmId, this, EnlistmentOptions.None);
+                try
+                {
+                    Guid rmId = ResourceManagerGuid;
 
-                Tracer.Debug("Enlisted in Durable Transaction with RM Id: " + rmId);
+                    // Enlist this object in the transaction.
+                    this.currentEnlistment =
+                        transaction.EnlistDurable(rmId, this, EnlistmentOptions.None);
 
-                TransactionInformation txInfo = transaction.TransactionInformation;
+                    Tracer.Debug("Enlisted in Durable Transaction with RM Id: " + rmId);
 
-                XATransactionId xaId = new XATransactionId();
-                this.transactionId = xaId;
+                    TransactionInformation txInfo = transaction.TransactionInformation;
 
-                if (txInfo.DistributedIdentifier != Guid.Empty)
-                {
-                    xaId.GlobalTransactionId = txInfo.DistributedIdentifier.ToByteArray();
-                    xaId.BranchQualifier = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
-                }
-                else
-                {
-                    xaId.GlobalTransactionId = Encoding.UTF8.GetBytes(txInfo.LocalIdentifier);
-                    xaId.BranchQualifier = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
-                }
+                    XATransactionId xaId = new XATransactionId();
+                    this.transactionId = xaId;
+
+                    if (txInfo.DistributedIdentifier != Guid.Empty)
+                    {
+                        xaId.GlobalTransactionId = txInfo.DistributedIdentifier.ToByteArray();
+                        xaId.BranchQualifier = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
+                    }
+                    else
+                    {
+                        xaId.GlobalTransactionId = Encoding.UTF8.GetBytes(txInfo.LocalIdentifier);
+                        xaId.BranchQualifier = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
+                    }
 
-                // Now notify the broker that a new XA'ish transaction has started.
-                TransactionInfo info = new TransactionInfo();
-                info.ConnectionId = this.connection.ConnectionId;
-                info.TransactionId = this.transactionId;
-                info.Type = (int) TransactionType.Begin;
+                    // Now notify the broker that a new XA'ish transaction has started.
+                    TransactionInfo info = new TransactionInfo();
+                    info.ConnectionId = this.connection.ConnectionId;
+                    info.TransactionId = this.transactionId;
+                    info.Type = (int) TransactionType.Begin;
 
-                this.session.Connection.Oneway(info);
+                    this.session.Connection.Oneway(info);
 
-                if (Tracer.IsDebugEnabled)
+                    if (Tracer.IsDebugEnabled)
+                    {
+                        Tracer.Debug("Began XA'ish Transaction:" + xaId.GlobalTransactionId.ToString());
+                    }
+                }
+                catch (Exception)
                 {
-                    Tracer.Debug("Began XA'ish Transaction:" + xaId.GlobalTransactionId.ToString());
+                    dtcControlEvent.Set();
+                    throw;
                 }
             }
-            catch(Exception)
-            {
-                dtcControlEvent.Set();
-                throw;
-            }
         }
 
         public void Prepare(PreparingEnlistment preparingEnlistment)
         {
-            try
+            lock (this.syncObject)
             {
-                Tracer.Debug("Prepare notification received for TX id: " + this.transactionId);
-				
-                BeforeEnd();
-
-                // Before sending the request to the broker, log the recovery bits, if
-                // this fails we can't prepare and the TX should be rolled back.
-                RecoveryLogger.LogRecoveryInfo(this.transactionId as XATransactionId,
-                                               preparingEnlistment.RecoveryInformation());
-
-	            // Inform the broker that work on the XA'sh TX Branch is complete.
-	            TransactionInfo info = new TransactionInfo();
-	            info.ConnectionId = this.connection.ConnectionId;
-	            info.TransactionId = this.transactionId;
-                info.Type = (int) TransactionType.End;
-
-                this.connection.CheckConnected();
-				this.connection.SyncRequest(info);
-
-                // Prepare the Transaction for commit.
-                info.Type = (int) TransactionType.Prepare;
-                IntegerResponse response = (IntegerResponse) this.connection.SyncRequest(info);
-                if(response.Result == XA_READONLY)
+                this.netTxState = TxState.Pending;
+
+                try
                 {
-                    Tracer.Debug("Transaction Prepare done and doesn't need a commit, TX id: " + this.transactionId);
+                    dtcControlEvent.Reset();
 
-                    this.transactionId = null;
-                    this.currentEnlistment = null;
+                    Tracer.Debug("Prepare notification received for TX id: " + this.transactionId);
 
-                    // Read Only means there's nothing to recover because there was no
-                    // change on the broker.
-                    RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
+                    BeforeEnd();
 
-                    // if server responds that nothing needs to be done, then reply prepared
-                    // but clear the current state data so we appear done to the commit method.
-                    preparingEnlistment.Prepared();
+                    // Before sending the request to the broker, log the recovery bits, if
+                    // this fails we can't prepare and the TX should be rolled back.
+                    RecoveryLogger.LogRecoveryInfo(this.transactionId as XATransactionId,
+                                                   preparingEnlistment.RecoveryInformation());
 
-                    // Done so commit won't be called.
-                    AfterCommit();
+                    // Inform the broker that work on the XA'sh TX Branch is complete.
+                    TransactionInfo info = new TransactionInfo();
+                    info.ConnectionId = this.connection.ConnectionId;
+                    info.TransactionId = this.transactionId;
+                    info.Type = (int) TransactionType.End;
 
-                    // A Read-Only TX is considered closed at this point, DTC won't call us again.
-                    this.dtcControlEvent.Set();
-                }
-                else
-                {
-                    Tracer.Debug("Transaction Prepare succeeded TX id: " + this.transactionId);
+                    this.connection.CheckConnected();
+                    this.connection.SyncRequest(info);
 
-                    // If work finished correctly, reply prepared
-                    preparingEnlistment.Prepared();
-                }
-            }
-            catch(Exception ex)
-            {
-                Tracer.DebugFormat("Transaction[{0}] Prepare failed with error: {1}",
-                                   this.transactionId, ex.Message);
+                    // Prepare the Transaction for commit.
+                    info.Type = (int) TransactionType.Prepare;
+                    IntegerResponse response = (IntegerResponse) this.connection.SyncRequest(info);
+                    if (response.Result == XA_READONLY)
+                    {
+                        Tracer.Debug("Transaction Prepare done and doesn't need a commit, TX id: " + this.transactionId);
 
-                AfterRollback();
-                preparingEnlistment.ForceRollback();
-                try
-                {
-                    this.connection.OnException(ex);
+                        this.transactionId = null;
+                        this.currentEnlistment = null;
+
+                        // Read Only means there's nothing to recover because there was no
+                        // change on the broker.
+                        RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
+
+                        // if server responds that nothing needs to be done, then reply prepared
+                        // but clear the current state data so we appear done to the commit method.
+                        preparingEnlistment.Prepared();
+
+                        // Done so commit won't be called.
+                        AfterCommit();
+
+                        // A Read-Only TX is considered closed at this point, DTC won't call us again.
+                        this.dtcControlEvent.Set();
+                    }
+                    else
+                    {
+                        Tracer.Debug("Transaction Prepare succeeded TX id: " + this.transactionId);
+
+                        // If work finished correctly, reply prepared
+                        preparingEnlistment.Prepared();
+                    }
                 }
-                catch (Exception error)
+                catch (Exception ex)
                 {
-                    Tracer.Error(error.ToString());
-                }
+                    Tracer.DebugFormat("Transaction[{0}] Prepare failed with error: {1}",
+                                       this.transactionId, ex.Message);
 
-                this.currentEnlistment = null;
-                this.transactionId = null;
-                this.dtcControlEvent.Set();
+                    AfterRollback();
+                    preparingEnlistment.ForceRollback();
+                    try
+                    {
+                        this.connection.OnException(ex);
+                    }
+                    catch (Exception error)
+                    {
+                        Tracer.Error(error.ToString());
+                    }
+
+                    this.currentEnlistment = null;
+                    this.transactionId = null;
+                    this.netTxState = TxState.None;
+                    this.dtcControlEvent.Set();
+                }
             }
         }
 
         public void Commit(Enlistment enlistment)
         {
-            try
+            lock (this.syncObject)
             {
-                Tracer.Debug("Commit notification received for TX id: " + this.transactionId);
-
-                if (this.transactionId != null)
+                try
                 {
-                    // Now notify the broker that a new XA'ish transaction has completed.
-                    TransactionInfo info = new TransactionInfo();
-                    info.ConnectionId = this.connection.ConnectionId;
-                    info.TransactionId = this.transactionId;
-                    info.Type = (int) TransactionType.CommitTwoPhase;
+                    Tracer.Debug("Commit notification received for TX id: " + this.transactionId);
 
-                    this.connection.CheckConnected();
-                    this.connection.SyncRequest(info);
+                    if (this.transactionId != null)
+                    {
+                        // Now notify the broker that a new XA'ish transaction has completed.
+                        TransactionInfo info = new TransactionInfo();
+                        info.ConnectionId = this.connection.ConnectionId;
+                        info.TransactionId = this.transactionId;
+                        info.Type = (int) TransactionType.CommitTwoPhase;
 
-                    Tracer.Debug("Transaction Commit Done TX id: " + this.transactionId);
+                        this.connection.CheckConnected();
+                        this.connection.SyncRequest(info);
 
-                    RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
+                        Tracer.Debug("Transaction Commit Done TX id: " + this.transactionId);
 
-                    // if server responds that nothing needs to be done, then reply done.
-                    enlistment.Done();
+                        RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
+
+                        // if server responds that nothing needs to be done, then reply done.
+                        enlistment.Done();
 
-                    AfterCommit();
+                        AfterCommit();
+                    }
                 }
-            }
-            catch(Exception ex)
-            {
-                Tracer.DebugFormat("Transaction[{0}] Commit failed with error: {1}",
-                                   this.transactionId, ex.Message);
-                AfterRollback();
-                try
+                catch (Exception ex)
                 {
-                    this.connection.OnException(ex);
+                    Tracer.DebugFormat("Transaction[{0}] Commit failed with error: {1}",
+                                       this.transactionId, ex.Message);
+                    AfterRollback();
+                    try
+                    {
+                        this.connection.OnException(ex);
+                    }
+                    catch (Exception error)
+                    {
+                        Tracer.Error(error.ToString());
+                    }
                 }
-                catch (Exception error)
+                finally
                 {
-                    Tracer.Error(error.ToString());
-                }
-            }
-            finally
-            {
-                this.currentEnlistment = null;
-                this.transactionId = null;
+                    this.currentEnlistment = null;
+                    this.transactionId = null;
+                    this.netTxState = TxState.None;
 
-                CountDownLatch latch = this.recoveryComplete;
-                if(latch != null)
-                {
-                    latch.countDown();
-                }
+                    CountDownLatch latch = this.recoveryComplete;
+                    if (latch != null)
+                    {
+                        latch.countDown();
+                    }
 
-                this.dtcControlEvent.Set();
+                    this.dtcControlEvent.Set();
+                }
             }
         }
 
         public void SinglePhaseCommit(SinglePhaseEnlistment enlistment)
         {
-            try
+            lock (this.syncObject)
             {
-                Tracer.Debug("Single Phase Commit notification received for TX id: " + this.transactionId);
-
-                if (this.transactionId != null)
+                try
                 {
-                	BeforeEnd();
+                    Tracer.Debug("Single Phase Commit notification received for TX id: " + this.transactionId);
 
-					// Now notify the broker that a new XA'ish transaction has completed.
-                    TransactionInfo info = new TransactionInfo();
-                    info.ConnectionId = this.connection.ConnectionId;
-                    info.TransactionId = this.transactionId;
-                    info.Type = (int) TransactionType.CommitOnePhase;
+                    if (this.transactionId != null)
+                    {
+                        BeforeEnd();
 
-                    this.connection.CheckConnected();
-                    this.connection.SyncRequest(info);
+                        // Now notify the broker that a new XA'ish transaction has completed.
+                        TransactionInfo info = new TransactionInfo();
+                        info.ConnectionId = this.connection.ConnectionId;
+                        info.TransactionId = this.transactionId;
+                        info.Type = (int) TransactionType.CommitOnePhase;
 
-                    Tracer.Debug("Transaction Single Phase Commit Done TX id: " + this.transactionId);
+                        this.connection.CheckConnected();
+                        this.connection.SyncRequest(info);
 
-                    // if server responds that nothing needs to be done, then reply done.
-                    enlistment.Done();
+                        Tracer.Debug("Transaction Single Phase Commit Done TX id: " + this.transactionId);
 
-                    AfterCommit();
+                        // if server responds that nothing needs to be done, then reply done.
+                        enlistment.Done();
+
+                        AfterCommit();
+                    }
                 }
-            }
-            catch(Exception ex)
-            {
-                Tracer.DebugFormat("Transaction[{0}] Single Phase Commit failed with error: {1}",
-                                   this.transactionId, ex.Message);
-                AfterRollback();
-                enlistment.Done();
-                try
+                catch (Exception ex)
                 {
-                    this.connection.OnException(ex);
+                    Tracer.DebugFormat("Transaction[{0}] Single Phase Commit failed with error: {1}",
+                                       this.transactionId, ex.Message);
+                    AfterRollback();
+                    enlistment.Done();
+                    try
+                    {
+                        this.connection.OnException(ex);
+                    }
+                    catch (Exception error)
+                    {
+                        Tracer.Error(error.ToString());
+                    }
                 }
-                catch (Exception error)
+                finally
                 {
-                    Tracer.Error(error.ToString());
-                }
-            }
-            finally
-            {
-                this.currentEnlistment = null;
-                this.transactionId = null;
+                    this.currentEnlistment = null;
+                    this.transactionId = null;
+                    this.netTxState = TxState.None;
 
-                this.dtcControlEvent.Set();
+                    this.dtcControlEvent.Set();
+                }
             }
         }
 		
         public void Rollback(Enlistment enlistment)
         {
-            try
-            {                
-                Tracer.Debug("Rollback notification received for TX id: " + this.transactionId);
-
-                if (this.transactionId != null)
+            lock (this.syncObject)
+            {
+                try
                 {
-                    BeforeEnd();
+                    Tracer.Debug("Rollback notification received for TX id: " + this.transactionId);
 
-                    // Now notify the broker that a new XA'ish transaction has started.
-                    TransactionInfo info = new TransactionInfo();
-                    info.ConnectionId = this.connection.ConnectionId;
-                    info.TransactionId = this.transactionId;
-                    info.Type = (int) TransactionType.End;
+                    if (this.transactionId != null)
+                    {
+                        BeforeEnd();
 
-                    this.connection.CheckConnected();
-                    this.connection.SyncRequest(info);
+                        // Now notify the broker that a new XA'ish transaction has started.
+                        TransactionInfo info = new TransactionInfo();
+                        info.ConnectionId = this.connection.ConnectionId;
+                        info.TransactionId = this.transactionId;
+                        info.Type = (int) TransactionType.End;
 
-                    info.Type = (int) TransactionType.Rollback;
-                    this.connection.CheckConnected();
-                    this.connection.SyncRequest(info);
+                        this.connection.CheckConnected();
+                        this.connection.SyncRequest(info);
 
-                    Tracer.Debug("Transaction Rollback Done TX id: " + this.transactionId);
+                        info.Type = (int) TransactionType.Rollback;
+                        this.connection.CheckConnected();
+                        this.connection.SyncRequest(info);
 
-                    RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
+                        Tracer.Debug("Transaction Rollback Done TX id: " + this.transactionId);
 
-                    // if server responds that nothing needs to be done, then reply done.
-                    enlistment.Done();
+                        RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
 
-                    AfterRollback();
+                        // if server responds that nothing needs to be done, then reply done.
+                        enlistment.Done();
+
+                        AfterRollback();
+                    }
                 }
-            }
-            catch(Exception ex)
-            {
-                Tracer.DebugFormat("Transaction[{0}] Rollback failed with error: {1}",
-                                   this.transactionId, ex.Message);
-                AfterRollback();
-                try
+                catch (Exception ex)
                 {
-                    this.connection.OnException(ex);
+                    Tracer.DebugFormat("Transaction[{0}] Rollback failed with error: {1}",
+                                       this.transactionId, ex.Message);
+                    AfterRollback();
+                    try
+                    {
+                        this.connection.OnException(ex);
+                    }
+                    catch (Exception error)
+                    {
+                        Tracer.Error(error.ToString());
+                    }
                 }
-                catch (Exception error)
+                finally
                 {
-                    Tracer.Error(error.ToString());
-                }
-            }
-            finally
-            {
-                this.currentEnlistment = null;
-                this.transactionId = null;
+                    this.currentEnlistment = null;
+                    this.transactionId = null;
+                    this.netTxState = TxState.None;
 
-                CountDownLatch latch = this.recoveryComplete;
-                if (latch != null)
-                {
-                    latch.countDown();
-                }
+                    CountDownLatch latch = this.recoveryComplete;
+                    if (latch != null)
+                    {
+                        latch.countDown();
+                    }
 
-                this.dtcControlEvent.Set();
+                    this.dtcControlEvent.Set();
+                }
             }
         }
 
         public void InDoubt(Enlistment enlistment)
         {
-            try
+            lock (syncObject)
             {
-                Tracer.Debug("In Doubt notification received for TX id: " + this.transactionId);
-				
-                BeforeEnd();
+                try
+                {
+                    Tracer.Debug("In Doubt notification received for TX id: " + this.transactionId);
 
-                // Now notify the broker that Rollback should be performed.
-                TransactionInfo info = new TransactionInfo();
-                info.ConnectionId = this.connection.ConnectionId;
-                info.TransactionId = this.transactionId;
-                info.Type = (int)TransactionType.End;
+                    BeforeEnd();
 
-                this.connection.CheckConnected();
-                this.connection.SyncRequest(info);
+                    // Now notify the broker that Rollback should be performed.
+                    TransactionInfo info = new TransactionInfo();
+                    info.ConnectionId = this.connection.ConnectionId;
+                    info.TransactionId = this.transactionId;
+                    info.Type = (int) TransactionType.End;
 
-                info.Type = (int)TransactionType.Rollback;
-                this.connection.CheckConnected();
-                this.connection.SyncRequest(info);
+                    this.connection.CheckConnected();
+                    this.connection.SyncRequest(info);
 
-                Tracer.Debug("InDoubt Transaction Rollback Done TX id: " + this.transactionId);
+                    info.Type = (int) TransactionType.Rollback;
+                    this.connection.CheckConnected();
+                    this.connection.SyncRequest(info);
 
-                RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
+                    Tracer.Debug("InDoubt Transaction Rollback Done TX id: " + this.transactionId);
 
-                // if server responds that nothing needs to be done, then reply done.
-                enlistment.Done();
+                    RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
 
-                AfterRollback();
-            }
-            finally
-            {
-                this.currentEnlistment = null;
-                this.transactionId = null;
+                    // if server responds that nothing needs to be done, then reply done.
+                    enlistment.Done();
 
-                CountDownLatch latch = this.recoveryComplete;
-                if (latch != null)
-                {
-                    latch.countDown();
+                    AfterRollback();
                 }
+                finally
+                {
+                    this.currentEnlistment = null;
+                    this.transactionId = null;
+                    this.netTxState = TxState.None;
+
+                    CountDownLatch latch = this.recoveryComplete;
+                    if (latch != null)
+                    {
+                        latch.countDown();
+                    }
 
-                this.dtcControlEvent.Set();
+                    this.dtcControlEvent.Set();
+                }
             }
         }
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs?rev=1096041&r1=1096040&r2=1096041&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs Fri Apr 22 21:10:36 2011
@@ -95,6 +95,149 @@ namespace Apache.NMS.ActiveMQ.Test
         }
 
         [Test]
+        public void TestRedeliveredCase2()
+        {
+            const int messageCount = 300;
+            const int receiveCount = 150;
+
+            // enqueue several messages
+            PurgeDatabase();
+            PurgeAndFillQueue(messageCount);
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.Start();
+
+                // receive half of total messages
+                for (int i = 0; i < receiveCount; i++)
+                {
+                    using (INetTxSession session = connection.CreateNetTxSession())
+                    {
+                        IQueue queue = session.GetQueue(testQueueName);
+
+                        // read message from queue and insert into db table
+                        using (IMessageConsumer consumer = session.CreateConsumer(queue))
+                        {
+                            using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
+                            using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+                            using (SqlCommand sqlInsertCommand = new SqlCommand())
+                            {
+                                sqlConnection.Open();
+                                sqlInsertCommand.Connection = sqlConnection;
+
+                                ITextMessage message =
+                                    consumer.Receive(TimeSpan.FromMilliseconds(10000)) as ITextMessage;
+                                sqlInsertCommand.CommandText =
+                                    string.Format("INSERT INTO {0} VALUES ({1})", testTable,
+                                                  Convert.ToInt32(message.Text));
+                                sqlInsertCommand.ExecuteNonQuery();
+
+                                scoped.Complete();
+                            }
+                        }
+
+                        session.Close();
+                    }
+
+                    Tracer.Debug("Completed for loop iteration #" + i);
+                }
+            }
+
+            // check that others message have status redelivered = false
+            IConnectionFactory checkFactory = new ConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (IConnection connection = checkFactory.CreateConnection())
+            {
+                connection.Start();
+
+                using (ISession session = connection.CreateSession())
+                using (IQueueBrowser browser = session.CreateBrowser(session.GetQueue(testQueueName)))
+                {
+                    IEnumerator enumerator = browser.GetEnumerator();
+
+                    while (enumerator.MoveNext())
+                    {
+                        IMessage msg = enumerator.Current as IMessage;
+                        Assert.IsNotNull(msg, "message is not in the queue!");
+                        Assert.IsFalse(msg.NMSRedelivered, "message is redelivered!");
+                    }
+                }
+            }
+        }
+
+        [Test]
+        public void TestRedeliveredCase3()
+        {
+            const int messageCount = 300;
+            const int receiveCount = 150;
+
+            // enqueue several messages
+            PurgeDatabase();
+            PurgeAndFillQueue(messageCount);
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.Start();
+
+                // receive half of total messages
+                using (INetTxSession session = connection.CreateNetTxSession())
+                {
+                    IQueue queue = session.GetQueue(testQueueName);
+
+                    // read message from queue and insert into db table
+                    using (IMessageConsumer consumer = session.CreateConsumer(queue))
+                    {
+                        for (int i = 0; i < receiveCount; i++)
+                        {
+                            using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
+                            using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+                            using (SqlCommand sqlInsertCommand = new SqlCommand())
+                            {
+                                sqlConnection.Open();
+                                sqlInsertCommand.Connection = sqlConnection;
+
+                                ITextMessage message =
+                                    consumer.Receive(TimeSpan.FromMilliseconds(10000)) as ITextMessage;
+                                sqlInsertCommand.CommandText =
+                                    string.Format("INSERT INTO {0} VALUES ({1})", testTable,
+                                                  Convert.ToInt32(message.Text));
+                                sqlInsertCommand.ExecuteNonQuery();
+                                scoped.Complete();
+                            }
+                        }
+                    }
+
+                    session.Close();
+                }
+            }
+
+            Tracer.Debug("First stage ok");
+
+            // check that others message have status redelivered = false
+            IConnectionFactory checkFactory = new ConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (IConnection connection = checkFactory.CreateConnection())
+            {
+                connection.Start();
+
+                using (ISession session = connection.CreateSession())
+                using (IQueueBrowser browser = session.CreateBrowser(session.GetQueue(testQueueName)))
+                {
+                    IEnumerator enumerator = browser.GetEnumerator();
+
+                    while (enumerator.MoveNext())
+                    {
+                        IMessage msg = enumerator.Current as IMessage;
+                        Assert.IsNotNull(msg, "message is not in the queue!");
+                        Assert.IsFalse(msg.NMSRedelivered, "message is redelivered!");
+                    }
+                }
+            }
+        }
+
+        [Test]
         public void TestRecoveryAfterCommitFailsBeforeSent()
         {
             // Test initialize - Fills in queue with data to send and clears the DB.