You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/04/22 23:10:36 UTC
svn commit: r1096041 - in
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src:
main/csharp/MessageConsumer.cs main/csharp/NetTxSession.cs
main/csharp/Session.cs main/csharp/TransactionContext.cs
test/csharp/DtcConsumerTransactionsTest.cs
Author: tabish
Date: Fri Apr 22 21:10:36 2011
New Revision: 1096041
URL: http://svn.apache.org/viewvc?rev=1096041&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQNET-326
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1096041&r1=1096040&r2=1096041&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Fri Apr 22 21:10:36 2011
@@ -763,13 +763,24 @@ namespace Apache.NMS.ActiveMQ
if(!IsAutoAcknowledgeBatch)
{
- lock(this.dispatchedMessages)
+ if (this.session.IsTransacted)
+ {
+ // In the case where the consumer is operating in concert with
+ // a distributed TX manager we need to wait whenever the TX
+ // is controlled by the DTC as it completes all operations
+ // async and we cannot start consumption again until all its
+ // tasks have completed.
+ this.session.TransactionContext.DtcWaitHandle.WaitOne();
+ }
+
+ lock(this.dispatchedMessages)
{
this.dispatchedMessages.AddFirst(dispatch);
}
if(this.session.IsTransacted)
{
+ this.session.TransactionContext.DtcWaitHandle.WaitOne();
this.AckLater(dispatch, AckType.DeliveredAck);
}
}
@@ -950,8 +961,11 @@ namespace Apache.NMS.ActiveMQ
if(this.session.IsTransacted)
{
- this.session.DoStartTransaction();
- ack.TransactionId = this.session.TransactionContext.TransactionId;
+ if (!this.session.TransactionContext.InTransaction)
+ {
+ this.session.DoStartTransaction();
+ }
+ ack.TransactionId = this.session.TransactionContext.TransactionId;
}
this.session.SendAck(ack);
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs?rev=1096041&r1=1096040&r2=1096041&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs Fri Apr 22 21:10:36 2011
@@ -16,6 +16,7 @@
*/
using System;
+using System.Threading;
using System.Transactions;
using Apache.NMS.ActiveMQ.Commands;
@@ -77,7 +78,18 @@ namespace Apache.NMS.ActiveMQ
internal override void DoStartTransaction()
{
- if(!TransactionContext.InNetTransaction && Transaction.Current != null)
+ TransactionContext.SyncRoot.WaitOne();
+
+ if (TransactionContext.InNetTransaction && TransactionContext.NetTxState == TransactionContext.TxState.Pending)
+ {
+ // To late to participate in this TX, we have to wait for it to complete then
+ // we can create a new TX and start from there.
+ TransactionContext.SyncRoot.ReleaseMutex();
+ TransactionContext.DtcWaitHandle.WaitOne();
+ TransactionContext.SyncRoot.WaitOne();
+ }
+
+ if (!TransactionContext.InNetTransaction && Transaction.Current != null)
{
Tracer.Debug("NetTxSession detected Ambient Transaction, start new TX with broker");
@@ -90,7 +102,7 @@ namespace Apache.NMS.ActiveMQ
// If an Async DTC operation is in progress such as Commit or Rollback
// we need to let it complete before deciding if the Session is in a TX
// otherwise we might error out for no reason.
- TransactionContext.DtcWaitHandle.WaitOne();
+ //TransactionContext.DtcWaitHandle.WaitOne();
if(TransactionContext.InNetTransaction)
{
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=1096041&r1=1096040&r2=1096041&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Fri Apr 22 21:10:36 2011
@@ -300,17 +300,21 @@ namespace Apache.NMS.ActiveMQ
try
{
- if(transactionContext.InNetTransaction)
+ TransactionContext.SyncRoot.WaitOne();
+
+ if (transactionContext.InNetTransaction)
{
- this.transactionContext.AddSynchronization(new SessionCloseSynchronization(this));
+ TransactionContext.SyncRoot.ReleaseMutex();
+
+ //this.transactionContext.AddSynchronization(new SessionCloseSynchronization(this));)
this.transactionContext.DtcWaitHandle.WaitOne();
}
- else
- {
- Tracer.InfoFormat("Closing The Session with Id {0}", this.info.SessionId);
- DoClose();
- Tracer.InfoFormat("Closed The Session with Id {0}", this.info.SessionId);
- }
+
+ TransactionContext.SyncRoot.ReleaseMutex();
+
+ Tracer.InfoFormat("Closing The Session with Id {0}", this.info.SessionId);
+ DoClose();
+ Tracer.InfoFormat("Closed The Session with Id {0}", this.info.SessionId);
}
catch(Exception ex)
{
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs?rev=1096041&r1=1096040&r2=1096041&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs Fri Apr 22 21:10:36 2011
@@ -199,13 +199,40 @@ namespace Apache.NMS.ActiveMQ
#region Transaction Members used when dealing with .NET System Transactions.
+ // When DTC calls prepare we must then wait for either the TX to commit, rollback or
+ // be canceled because its in doubt.
private readonly ManualResetEvent dtcControlEvent = new ManualResetEvent(true);
+ // Once the DTC calls prepare we lock this object and don't unlock it again until
+ // the TX has either completed or terminated, the users of this class should use
+ // this sync point when the TX is a DTC version as opposed to a local one.
+ private readonly Mutex syncObject = new Mutex();
+
+ public enum TxState
+ {
+ None = 0, Active = 1, Pending = 2
+ }
+
+ private TxState netTxState = TxState.None;
+
+ public Mutex SyncRoot
+ {
+ get { return this.syncObject; }
+ }
+
public bool InNetTransaction
{
get{ return this.transactionId != null && this.transactionId is XATransactionId; }
}
+ public TxState NetTxState
+ {
+ get
+ {
+ return this.netTxState;
+ }
+ }
+
public WaitHandle DtcWaitHandle
{
get { return dtcControlEvent; }
@@ -213,348 +240,375 @@ namespace Apache.NMS.ActiveMQ
public void Begin(Transaction transaction)
{
- Tracer.Debug("Begin notification received");
-
- if(InNetTransaction)
+ lock (syncObject)
{
- throw new TransactionInProgressException("A Transaction is already in Progress");
- }
+ this.netTxState = TxState.Active;
- dtcControlEvent.Reset();
+ Tracer.Debug("Begin notification received");
- try
- {
- Guid rmId = ResourceManagerGuid;
+ if (InNetTransaction)
+ {
+ throw new TransactionInProgressException("A Transaction is already in Progress");
+ }
- // Enlist this object in the transaction.
- this.currentEnlistment =
- transaction.EnlistDurable(rmId, this, EnlistmentOptions.None);
+ try
+ {
+ Guid rmId = ResourceManagerGuid;
- Tracer.Debug("Enlisted in Durable Transaction with RM Id: " + rmId);
+ // Enlist this object in the transaction.
+ this.currentEnlistment =
+ transaction.EnlistDurable(rmId, this, EnlistmentOptions.None);
- TransactionInformation txInfo = transaction.TransactionInformation;
+ Tracer.Debug("Enlisted in Durable Transaction with RM Id: " + rmId);
- XATransactionId xaId = new XATransactionId();
- this.transactionId = xaId;
+ TransactionInformation txInfo = transaction.TransactionInformation;
- if (txInfo.DistributedIdentifier != Guid.Empty)
- {
- xaId.GlobalTransactionId = txInfo.DistributedIdentifier.ToByteArray();
- xaId.BranchQualifier = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
- }
- else
- {
- xaId.GlobalTransactionId = Encoding.UTF8.GetBytes(txInfo.LocalIdentifier);
- xaId.BranchQualifier = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
- }
+ XATransactionId xaId = new XATransactionId();
+ this.transactionId = xaId;
+
+ if (txInfo.DistributedIdentifier != Guid.Empty)
+ {
+ xaId.GlobalTransactionId = txInfo.DistributedIdentifier.ToByteArray();
+ xaId.BranchQualifier = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
+ }
+ else
+ {
+ xaId.GlobalTransactionId = Encoding.UTF8.GetBytes(txInfo.LocalIdentifier);
+ xaId.BranchQualifier = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
+ }
- // Now notify the broker that a new XA'ish transaction has started.
- TransactionInfo info = new TransactionInfo();
- info.ConnectionId = this.connection.ConnectionId;
- info.TransactionId = this.transactionId;
- info.Type = (int) TransactionType.Begin;
+ // Now notify the broker that a new XA'ish transaction has started.
+ TransactionInfo info = new TransactionInfo();
+ info.ConnectionId = this.connection.ConnectionId;
+ info.TransactionId = this.transactionId;
+ info.Type = (int) TransactionType.Begin;
- this.session.Connection.Oneway(info);
+ this.session.Connection.Oneway(info);
- if (Tracer.IsDebugEnabled)
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("Began XA'ish Transaction:" + xaId.GlobalTransactionId.ToString());
+ }
+ }
+ catch (Exception)
{
- Tracer.Debug("Began XA'ish Transaction:" + xaId.GlobalTransactionId.ToString());
+ dtcControlEvent.Set();
+ throw;
}
}
- catch(Exception)
- {
- dtcControlEvent.Set();
- throw;
- }
}
public void Prepare(PreparingEnlistment preparingEnlistment)
{
- try
+ lock (this.syncObject)
{
- Tracer.Debug("Prepare notification received for TX id: " + this.transactionId);
-
- BeforeEnd();
-
- // Before sending the request to the broker, log the recovery bits, if
- // this fails we can't prepare and the TX should be rolled back.
- RecoveryLogger.LogRecoveryInfo(this.transactionId as XATransactionId,
- preparingEnlistment.RecoveryInformation());
-
- // Inform the broker that work on the XA'sh TX Branch is complete.
- TransactionInfo info = new TransactionInfo();
- info.ConnectionId = this.connection.ConnectionId;
- info.TransactionId = this.transactionId;
- info.Type = (int) TransactionType.End;
-
- this.connection.CheckConnected();
- this.connection.SyncRequest(info);
-
- // Prepare the Transaction for commit.
- info.Type = (int) TransactionType.Prepare;
- IntegerResponse response = (IntegerResponse) this.connection.SyncRequest(info);
- if(response.Result == XA_READONLY)
+ this.netTxState = TxState.Pending;
+
+ try
{
- Tracer.Debug("Transaction Prepare done and doesn't need a commit, TX id: " + this.transactionId);
+ dtcControlEvent.Reset();
- this.transactionId = null;
- this.currentEnlistment = null;
+ Tracer.Debug("Prepare notification received for TX id: " + this.transactionId);
- // Read Only means there's nothing to recover because there was no
- // change on the broker.
- RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
+ BeforeEnd();
- // if server responds that nothing needs to be done, then reply prepared
- // but clear the current state data so we appear done to the commit method.
- preparingEnlistment.Prepared();
+ // Before sending the request to the broker, log the recovery bits, if
+ // this fails we can't prepare and the TX should be rolled back.
+ RecoveryLogger.LogRecoveryInfo(this.transactionId as XATransactionId,
+ preparingEnlistment.RecoveryInformation());
- // Done so commit won't be called.
- AfterCommit();
+ // Inform the broker that work on the XA'sh TX Branch is complete.
+ TransactionInfo info = new TransactionInfo();
+ info.ConnectionId = this.connection.ConnectionId;
+ info.TransactionId = this.transactionId;
+ info.Type = (int) TransactionType.End;
- // A Read-Only TX is considered closed at this point, DTC won't call us again.
- this.dtcControlEvent.Set();
- }
- else
- {
- Tracer.Debug("Transaction Prepare succeeded TX id: " + this.transactionId);
+ this.connection.CheckConnected();
+ this.connection.SyncRequest(info);
- // If work finished correctly, reply prepared
- preparingEnlistment.Prepared();
- }
- }
- catch(Exception ex)
- {
- Tracer.DebugFormat("Transaction[{0}] Prepare failed with error: {1}",
- this.transactionId, ex.Message);
+ // Prepare the Transaction for commit.
+ info.Type = (int) TransactionType.Prepare;
+ IntegerResponse response = (IntegerResponse) this.connection.SyncRequest(info);
+ if (response.Result == XA_READONLY)
+ {
+ Tracer.Debug("Transaction Prepare done and doesn't need a commit, TX id: " + this.transactionId);
- AfterRollback();
- preparingEnlistment.ForceRollback();
- try
- {
- this.connection.OnException(ex);
+ this.transactionId = null;
+ this.currentEnlistment = null;
+
+ // Read Only means there's nothing to recover because there was no
+ // change on the broker.
+ RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
+
+ // if server responds that nothing needs to be done, then reply prepared
+ // but clear the current state data so we appear done to the commit method.
+ preparingEnlistment.Prepared();
+
+ // Done so commit won't be called.
+ AfterCommit();
+
+ // A Read-Only TX is considered closed at this point, DTC won't call us again.
+ this.dtcControlEvent.Set();
+ }
+ else
+ {
+ Tracer.Debug("Transaction Prepare succeeded TX id: " + this.transactionId);
+
+ // If work finished correctly, reply prepared
+ preparingEnlistment.Prepared();
+ }
}
- catch (Exception error)
+ catch (Exception ex)
{
- Tracer.Error(error.ToString());
- }
+ Tracer.DebugFormat("Transaction[{0}] Prepare failed with error: {1}",
+ this.transactionId, ex.Message);
- this.currentEnlistment = null;
- this.transactionId = null;
- this.dtcControlEvent.Set();
+ AfterRollback();
+ preparingEnlistment.ForceRollback();
+ try
+ {
+ this.connection.OnException(ex);
+ }
+ catch (Exception error)
+ {
+ Tracer.Error(error.ToString());
+ }
+
+ this.currentEnlistment = null;
+ this.transactionId = null;
+ this.netTxState = TxState.None;
+ this.dtcControlEvent.Set();
+ }
}
}
public void Commit(Enlistment enlistment)
{
- try
+ lock (this.syncObject)
{
- Tracer.Debug("Commit notification received for TX id: " + this.transactionId);
-
- if (this.transactionId != null)
+ try
{
- // Now notify the broker that a new XA'ish transaction has completed.
- TransactionInfo info = new TransactionInfo();
- info.ConnectionId = this.connection.ConnectionId;
- info.TransactionId = this.transactionId;
- info.Type = (int) TransactionType.CommitTwoPhase;
+ Tracer.Debug("Commit notification received for TX id: " + this.transactionId);
- this.connection.CheckConnected();
- this.connection.SyncRequest(info);
+ if (this.transactionId != null)
+ {
+ // Now notify the broker that a new XA'ish transaction has completed.
+ TransactionInfo info = new TransactionInfo();
+ info.ConnectionId = this.connection.ConnectionId;
+ info.TransactionId = this.transactionId;
+ info.Type = (int) TransactionType.CommitTwoPhase;
- Tracer.Debug("Transaction Commit Done TX id: " + this.transactionId);
+ this.connection.CheckConnected();
+ this.connection.SyncRequest(info);
- RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
+ Tracer.Debug("Transaction Commit Done TX id: " + this.transactionId);
- // if server responds that nothing needs to be done, then reply done.
- enlistment.Done();
+ RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
+
+ // if server responds that nothing needs to be done, then reply done.
+ enlistment.Done();
- AfterCommit();
+ AfterCommit();
+ }
}
- }
- catch(Exception ex)
- {
- Tracer.DebugFormat("Transaction[{0}] Commit failed with error: {1}",
- this.transactionId, ex.Message);
- AfterRollback();
- try
+ catch (Exception ex)
{
- this.connection.OnException(ex);
+ Tracer.DebugFormat("Transaction[{0}] Commit failed with error: {1}",
+ this.transactionId, ex.Message);
+ AfterRollback();
+ try
+ {
+ this.connection.OnException(ex);
+ }
+ catch (Exception error)
+ {
+ Tracer.Error(error.ToString());
+ }
}
- catch (Exception error)
+ finally
{
- Tracer.Error(error.ToString());
- }
- }
- finally
- {
- this.currentEnlistment = null;
- this.transactionId = null;
+ this.currentEnlistment = null;
+ this.transactionId = null;
+ this.netTxState = TxState.None;
- CountDownLatch latch = this.recoveryComplete;
- if(latch != null)
- {
- latch.countDown();
- }
+ CountDownLatch latch = this.recoveryComplete;
+ if (latch != null)
+ {
+ latch.countDown();
+ }
- this.dtcControlEvent.Set();
+ this.dtcControlEvent.Set();
+ }
}
}
public void SinglePhaseCommit(SinglePhaseEnlistment enlistment)
{
- try
+ lock (this.syncObject)
{
- Tracer.Debug("Single Phase Commit notification received for TX id: " + this.transactionId);
-
- if (this.transactionId != null)
+ try
{
- BeforeEnd();
+ Tracer.Debug("Single Phase Commit notification received for TX id: " + this.transactionId);
- // Now notify the broker that a new XA'ish transaction has completed.
- TransactionInfo info = new TransactionInfo();
- info.ConnectionId = this.connection.ConnectionId;
- info.TransactionId = this.transactionId;
- info.Type = (int) TransactionType.CommitOnePhase;
+ if (this.transactionId != null)
+ {
+ BeforeEnd();
- this.connection.CheckConnected();
- this.connection.SyncRequest(info);
+ // Now notify the broker that a new XA'ish transaction has completed.
+ TransactionInfo info = new TransactionInfo();
+ info.ConnectionId = this.connection.ConnectionId;
+ info.TransactionId = this.transactionId;
+ info.Type = (int) TransactionType.CommitOnePhase;
- Tracer.Debug("Transaction Single Phase Commit Done TX id: " + this.transactionId);
+ this.connection.CheckConnected();
+ this.connection.SyncRequest(info);
- // if server responds that nothing needs to be done, then reply done.
- enlistment.Done();
+ Tracer.Debug("Transaction Single Phase Commit Done TX id: " + this.transactionId);
- AfterCommit();
+ // if server responds that nothing needs to be done, then reply done.
+ enlistment.Done();
+
+ AfterCommit();
+ }
}
- }
- catch(Exception ex)
- {
- Tracer.DebugFormat("Transaction[{0}] Single Phase Commit failed with error: {1}",
- this.transactionId, ex.Message);
- AfterRollback();
- enlistment.Done();
- try
+ catch (Exception ex)
{
- this.connection.OnException(ex);
+ Tracer.DebugFormat("Transaction[{0}] Single Phase Commit failed with error: {1}",
+ this.transactionId, ex.Message);
+ AfterRollback();
+ enlistment.Done();
+ try
+ {
+ this.connection.OnException(ex);
+ }
+ catch (Exception error)
+ {
+ Tracer.Error(error.ToString());
+ }
}
- catch (Exception error)
+ finally
{
- Tracer.Error(error.ToString());
- }
- }
- finally
- {
- this.currentEnlistment = null;
- this.transactionId = null;
+ this.currentEnlistment = null;
+ this.transactionId = null;
+ this.netTxState = TxState.None;
- this.dtcControlEvent.Set();
+ this.dtcControlEvent.Set();
+ }
}
}
public void Rollback(Enlistment enlistment)
{
- try
- {
- Tracer.Debug("Rollback notification received for TX id: " + this.transactionId);
-
- if (this.transactionId != null)
+ lock (this.syncObject)
+ {
+ try
{
- BeforeEnd();
+ Tracer.Debug("Rollback notification received for TX id: " + this.transactionId);
- // Now notify the broker that a new XA'ish transaction has started.
- TransactionInfo info = new TransactionInfo();
- info.ConnectionId = this.connection.ConnectionId;
- info.TransactionId = this.transactionId;
- info.Type = (int) TransactionType.End;
+ if (this.transactionId != null)
+ {
+ BeforeEnd();
- this.connection.CheckConnected();
- this.connection.SyncRequest(info);
+ // Now notify the broker that a new XA'ish transaction has started.
+ TransactionInfo info = new TransactionInfo();
+ info.ConnectionId = this.connection.ConnectionId;
+ info.TransactionId = this.transactionId;
+ info.Type = (int) TransactionType.End;
- info.Type = (int) TransactionType.Rollback;
- this.connection.CheckConnected();
- this.connection.SyncRequest(info);
+ this.connection.CheckConnected();
+ this.connection.SyncRequest(info);
- Tracer.Debug("Transaction Rollback Done TX id: " + this.transactionId);
+ info.Type = (int) TransactionType.Rollback;
+ this.connection.CheckConnected();
+ this.connection.SyncRequest(info);
- RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
+ Tracer.Debug("Transaction Rollback Done TX id: " + this.transactionId);
- // if server responds that nothing needs to be done, then reply done.
- enlistment.Done();
+ RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
- AfterRollback();
+ // if server responds that nothing needs to be done, then reply done.
+ enlistment.Done();
+
+ AfterRollback();
+ }
}
- }
- catch(Exception ex)
- {
- Tracer.DebugFormat("Transaction[{0}] Rollback failed with error: {1}",
- this.transactionId, ex.Message);
- AfterRollback();
- try
+ catch (Exception ex)
{
- this.connection.OnException(ex);
+ Tracer.DebugFormat("Transaction[{0}] Rollback failed with error: {1}",
+ this.transactionId, ex.Message);
+ AfterRollback();
+ try
+ {
+ this.connection.OnException(ex);
+ }
+ catch (Exception error)
+ {
+ Tracer.Error(error.ToString());
+ }
}
- catch (Exception error)
+ finally
{
- Tracer.Error(error.ToString());
- }
- }
- finally
- {
- this.currentEnlistment = null;
- this.transactionId = null;
+ this.currentEnlistment = null;
+ this.transactionId = null;
+ this.netTxState = TxState.None;
- CountDownLatch latch = this.recoveryComplete;
- if (latch != null)
- {
- latch.countDown();
- }
+ CountDownLatch latch = this.recoveryComplete;
+ if (latch != null)
+ {
+ latch.countDown();
+ }
- this.dtcControlEvent.Set();
+ this.dtcControlEvent.Set();
+ }
}
}
public void InDoubt(Enlistment enlistment)
{
- try
+ lock (syncObject)
{
- Tracer.Debug("In Doubt notification received for TX id: " + this.transactionId);
-
- BeforeEnd();
+ try
+ {
+ Tracer.Debug("In Doubt notification received for TX id: " + this.transactionId);
- // Now notify the broker that Rollback should be performed.
- TransactionInfo info = new TransactionInfo();
- info.ConnectionId = this.connection.ConnectionId;
- info.TransactionId = this.transactionId;
- info.Type = (int)TransactionType.End;
+ BeforeEnd();
- this.connection.CheckConnected();
- this.connection.SyncRequest(info);
+ // Now notify the broker that Rollback should be performed.
+ TransactionInfo info = new TransactionInfo();
+ info.ConnectionId = this.connection.ConnectionId;
+ info.TransactionId = this.transactionId;
+ info.Type = (int) TransactionType.End;
- info.Type = (int)TransactionType.Rollback;
- this.connection.CheckConnected();
- this.connection.SyncRequest(info);
+ this.connection.CheckConnected();
+ this.connection.SyncRequest(info);
- Tracer.Debug("InDoubt Transaction Rollback Done TX id: " + this.transactionId);
+ info.Type = (int) TransactionType.Rollback;
+ this.connection.CheckConnected();
+ this.connection.SyncRequest(info);
- RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
+ Tracer.Debug("InDoubt Transaction Rollback Done TX id: " + this.transactionId);
- // if server responds that nothing needs to be done, then reply done.
- enlistment.Done();
+ RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
- AfterRollback();
- }
- finally
- {
- this.currentEnlistment = null;
- this.transactionId = null;
+ // if server responds that nothing needs to be done, then reply done.
+ enlistment.Done();
- CountDownLatch latch = this.recoveryComplete;
- if (latch != null)
- {
- latch.countDown();
+ AfterRollback();
}
+ finally
+ {
+ this.currentEnlistment = null;
+ this.transactionId = null;
+ this.netTxState = TxState.None;
+
+ CountDownLatch latch = this.recoveryComplete;
+ if (latch != null)
+ {
+ latch.countDown();
+ }
- this.dtcControlEvent.Set();
+ this.dtcControlEvent.Set();
+ }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs?rev=1096041&r1=1096040&r2=1096041&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs Fri Apr 22 21:10:36 2011
@@ -95,6 +95,149 @@ namespace Apache.NMS.ActiveMQ.Test
}
[Test]
+ public void TestRedeliveredCase2()
+ {
+ const int messageCount = 300;
+ const int receiveCount = 150;
+
+ // enqueue several messages
+ PurgeDatabase();
+ PurgeAndFillQueue(messageCount);
+
+ INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+ using (INetTxConnection connection = factory.CreateNetTxConnection())
+ {
+ connection.Start();
+
+ // receive half of total messages
+ for (int i = 0; i < receiveCount; i++)
+ {
+ using (INetTxSession session = connection.CreateNetTxSession())
+ {
+ IQueue queue = session.GetQueue(testQueueName);
+
+ // read message from queue and insert into db table
+ using (IMessageConsumer consumer = session.CreateConsumer(queue))
+ {
+ using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
+ using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+ using (SqlCommand sqlInsertCommand = new SqlCommand())
+ {
+ sqlConnection.Open();
+ sqlInsertCommand.Connection = sqlConnection;
+
+ ITextMessage message =
+ consumer.Receive(TimeSpan.FromMilliseconds(10000)) as ITextMessage;
+ sqlInsertCommand.CommandText =
+ string.Format("INSERT INTO {0} VALUES ({1})", testTable,
+ Convert.ToInt32(message.Text));
+ sqlInsertCommand.ExecuteNonQuery();
+
+ scoped.Complete();
+ }
+ }
+
+ session.Close();
+ }
+
+ Tracer.Debug("Completed for loop iteration #" + i);
+ }
+ }
+
+ // check that others message have status redelivered = false
+ IConnectionFactory checkFactory = new ConnectionFactory(ReplaceEnvVar(connectionURI));
+
+ using (IConnection connection = checkFactory.CreateConnection())
+ {
+ connection.Start();
+
+ using (ISession session = connection.CreateSession())
+ using (IQueueBrowser browser = session.CreateBrowser(session.GetQueue(testQueueName)))
+ {
+ IEnumerator enumerator = browser.GetEnumerator();
+
+ while (enumerator.MoveNext())
+ {
+ IMessage msg = enumerator.Current as IMessage;
+ Assert.IsNotNull(msg, "message is not in the queue!");
+ Assert.IsFalse(msg.NMSRedelivered, "message is redelivered!");
+ }
+ }
+ }
+ }
+
+ [Test]
+ public void TestRedeliveredCase3()
+ {
+ const int messageCount = 300;
+ const int receiveCount = 150;
+
+ // enqueue several messages
+ PurgeDatabase();
+ PurgeAndFillQueue(messageCount);
+
+ INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+ using (INetTxConnection connection = factory.CreateNetTxConnection())
+ {
+ connection.Start();
+
+ // receive half of total messages
+ using (INetTxSession session = connection.CreateNetTxSession())
+ {
+ IQueue queue = session.GetQueue(testQueueName);
+
+ // read message from queue and insert into db table
+ using (IMessageConsumer consumer = session.CreateConsumer(queue))
+ {
+ for (int i = 0; i < receiveCount; i++)
+ {
+ using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
+ using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+ using (SqlCommand sqlInsertCommand = new SqlCommand())
+ {
+ sqlConnection.Open();
+ sqlInsertCommand.Connection = sqlConnection;
+
+ ITextMessage message =
+ consumer.Receive(TimeSpan.FromMilliseconds(10000)) as ITextMessage;
+ sqlInsertCommand.CommandText =
+ string.Format("INSERT INTO {0} VALUES ({1})", testTable,
+ Convert.ToInt32(message.Text));
+ sqlInsertCommand.ExecuteNonQuery();
+ scoped.Complete();
+ }
+ }
+ }
+
+ session.Close();
+ }
+ }
+
+ Tracer.Debug("First stage ok");
+
+ // check that others message have status redelivered = false
+ IConnectionFactory checkFactory = new ConnectionFactory(ReplaceEnvVar(connectionURI));
+
+ using (IConnection connection = checkFactory.CreateConnection())
+ {
+ connection.Start();
+
+ using (ISession session = connection.CreateSession())
+ using (IQueueBrowser browser = session.CreateBrowser(session.GetQueue(testQueueName)))
+ {
+ IEnumerator enumerator = browser.GetEnumerator();
+
+ while (enumerator.MoveNext())
+ {
+ IMessage msg = enumerator.Current as IMessage;
+ Assert.IsNotNull(msg, "message is not in the queue!");
+ Assert.IsFalse(msg.NMSRedelivered, "message is redelivered!");
+ }
+ }
+ }
+ }
+
+ [Test]
public void TestRecoveryAfterCommitFailsBeforeSent()
{
// Test initialize - Fills in queue with data to send and clears the DB.