You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/05/20 18:57:23 UTC
svn commit: r1484526 - in
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk: ./ src/main/csharp/
Author: tabish
Date: Mon May 20 16:57:22 2013
New Revision: 1484526
URL: http://svn.apache.org/r1484526
Log:
continue to partition the DTC code into its own domain.
Added:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxTransactionContext.cs
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnectionFactory.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxMessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1484526&r1=1484525&r2=1484526&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Mon May 20 16:57:22 2013
@@ -1025,38 +1025,18 @@ namespace Apache.NMS.ActiveMQ
}
}
- public void BeforeMessageIsConsumed(MessageDispatch dispatch)
+ public virtual void BeforeMessageIsConsumed(MessageDispatch dispatch)
{
this.lastDeliveredSequenceId = dispatch.Message.MessageId.BrokerSequenceId;
if (!IsAutoAcknowledgeBatch)
{
- if (this.session.IsTransacted)
- {
- bool waitForDtcWaitHandle = false;
- lock (this.session.TransactionContext.SyncRoot)
- {
- // In the case where the consumer is operating in concert with a
- // distributed TX manager we need to wait whenever the TX is being
- // controlled by the DTC as it completes all operations async and
- // we cannot start consumption again until all its tasks have completed.)
- waitForDtcWaitHandle = this.session.TransactionContext.InNetTransaction &&
- this.session.TransactionContext.NetTxState ==
- TransactionContext.TxState.Pending;
- }
-
- if (waitForDtcWaitHandle)
- {
- this.session.TransactionContext.DtcWaitHandle.WaitOne();
- }
- }
-
lock(this.dispatchedMessages)
{
this.dispatchedMessages.AddFirst(dispatch);
}
- if(this.session.IsTransacted)
+ if (this.session.IsTransacted)
{
if (this.transactedIndividualAck)
{
@@ -1092,7 +1072,7 @@ namespace Apache.NMS.ActiveMQ
return false;
}
- public void AfterMessageIsConsumed(MessageDispatch dispatch, bool expired)
+ public virtual void AfterMessageIsConsumed(MessageDispatch dispatch, bool expired)
{
if(this.unconsumedMessages.Closed)
{
@@ -1603,7 +1583,7 @@ namespace Apache.NMS.ActiveMQ
}
}
- private bool IsAutoAcknowledgeBatch
+ protected bool IsAutoAcknowledgeBatch
{
get { return this.session.IsDupsOkAcknowledge && !this.info.Destination.IsQueue; }
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs?rev=1484526&r1=1484525&r2=1484526&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs Mon May 20 16:57:22 2013
@@ -29,7 +29,7 @@ namespace Apache.NMS.ActiveMQ
/// The default Session creation methods of Connection are overriden here
/// to always return a TX capable session instance.
/// </summary>
- public class NetTxConnection : Connection, INetTxConnection
+ public sealed class NetTxConnection : Connection, INetTxConnection
{
private NetTxRecoveryPolicy recoveryPolicy = new NetTxRecoveryPolicy();
private Guid configuredResourceManagerId = Guid.Empty;
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnectionFactory.cs?rev=1484526&r1=1484525&r2=1484526&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnectionFactory.cs Mon May 20 16:57:22 2013
@@ -23,7 +23,7 @@ using Apache.NMS.ActiveMQ.Transport;
namespace Apache.NMS.ActiveMQ
{
- public class NetTxConnectionFactory : ConnectionFactory, INetTxConnectionFactory
+ public sealed class NetTxConnectionFactory : ConnectionFactory, INetTxConnectionFactory
{
private NetTxRecoveryPolicy recoveryPolicy = new NetTxRecoveryPolicy();
private Guid configuredResourceManagerId = Guid.Empty;
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxMessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxMessageConsumer.cs?rev=1484526&r1=1484525&r2=1484526&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxMessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxMessageConsumer.cs Mon May 20 16:57:22 2013
@@ -22,9 +22,10 @@ using Apache.NMS.ActiveMQ.Commands;
namespace Apache.NMS.ActiveMQ
{
- class NetTxMessageConsumer : MessageConsumer
+ public sealed class NetTxMessageConsumer : MessageConsumer
{
private readonly NetTxSession session;
+ private readonly NetTxTransactionContext transactionContext;
internal NetTxMessageConsumer(Session session, ConsumerId id, ActiveMQDestination destination,
string name, string selector, int prefetch, int maxPendingMessageCount,
@@ -33,6 +34,7 @@ namespace Apache.NMS.ActiveMQ
maxPendingMessageCount, noLocal, browser, dispatchAsync)
{
this.session = session as NetTxSession;
+ this.transactionContext = session.TransactionContext as NetTxTransactionContext;
}
public override void Close()
@@ -42,7 +44,7 @@ namespace Apache.NMS.ActiveMQ
return;
}
- lock (this.session.TransactionContext.SyncRoot)
+ lock (this.transactionContext.SyncRoot)
{
if (this.session.IsTransacted || this.session.TransactionContext.InTransaction)
{
@@ -59,5 +61,34 @@ namespace Apache.NMS.ActiveMQ
}
}
}
+
+ public override void BeforeMessageIsConsumed(MessageDispatch dispatch)
+ {
+ if (!IsAutoAcknowledgeBatch)
+ {
+ if (this.session.IsTransacted)
+ {
+ bool waitForDtcWaitHandle = false;
+ lock (this.transactionContext.SyncRoot)
+ {
+ // In the case where the consumer is operating in concert with a
+ // distributed TX manager we need to wait whenever the TX is being
+ // controlled by the DTC as it completes all operations async and
+ // we cannot start consumption again until all its tasks have completed.)
+ waitForDtcWaitHandle = this.transactionContext.InNetTransaction &&
+ this.transactionContext.NetTxState ==
+ NetTxTransactionContext.TxState.Pending;
+ }
+
+ if (waitForDtcWaitHandle)
+ {
+ this.transactionContext.DtcWaitHandle.WaitOne();
+ }
+ }
+ }
+
+ base.BeforeMessageIsConsumed(dispatch);
+ }
+
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs?rev=1484526&r1=1484525&r2=1484526&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs Mon May 20 16:57:22 2013
@@ -22,12 +22,15 @@ using Apache.NMS.ActiveMQ.Commands;
namespace Apache.NMS.ActiveMQ
{
- public class NetTxSession : Session, INetTxSession
+ public sealed class NetTxSession : Session, INetTxSession
{
+ private readonly NetTxTransactionContext transactionContext;
+
public NetTxSession(Connection connection, SessionId id)
: base(connection, id, AcknowledgementMode.AutoAcknowledge)
{
- TransactionContext.InitializeDtcTxContext();
+ this.transactionContext = TransactionContext as NetTxTransactionContext;
+ this.transactionContext.InitializeDtcTxContext();
}
/// <summary>
@@ -55,7 +58,7 @@ namespace Apache.NMS.ActiveMQ
/// </summary>
public override bool IsTransacted
{
- get { return Transaction.Current != null || TransactionContext.InNetTransaction; }
+ get { return Transaction.Current != null || transactionContext.InNetTransaction; }
}
public override bool IsAutoAcknowledge
@@ -73,17 +76,17 @@ namespace Apache.NMS.ActiveMQ
try
{
- if (TransactionContext.InNetTransaction)
+ if (transactionContext.InNetTransaction)
{
- lock (TransactionContext.SyncRoot)
+ lock (transactionContext.SyncRoot)
{
- if (TransactionContext.InNetTransaction)
+ if (transactionContext.InNetTransaction)
{
// Must wait for all the DTC operations to complete before
// moving on from this close call.
- Monitor.Exit(TransactionContext.SyncRoot);
- this.TransactionContext.DtcWaitHandle.WaitOne();
- Monitor.Enter(TransactionContext.SyncRoot);
+ Monitor.Exit(transactionContext.SyncRoot);
+ this.transactionContext.DtcWaitHandle.WaitOne();
+ Monitor.Enter(transactionContext.SyncRoot);
}
}
}
@@ -104,6 +107,11 @@ namespace Apache.NMS.ActiveMQ
maxPending, noLocal, false, this.DispatchAsync);
}
+ protected override TransactionContext CreateTransactionContext()
+ {
+ return new NetTxTransactionContext(this);
+ }
+
internal override void DoRollback()
{
// Only the Transaction Manager can do this when in a .NET Transaction.
@@ -118,18 +126,18 @@ namespace Apache.NMS.ActiveMQ
internal override void DoStartTransaction()
{
- lock (TransactionContext.SyncRoot)
+ lock (transactionContext.SyncRoot)
{
- if (TransactionContext.InNetTransaction && TransactionContext.NetTxState == TransactionContext.TxState.Pending)
+ if (transactionContext.InNetTransaction && transactionContext.NetTxState == NetTxTransactionContext.TxState.Pending)
{
// To late to participate in this TX, we have to wait for it to complete then
// we can create a new TX and start from there.
- Monitor.Exit(TransactionContext.SyncRoot);
- TransactionContext.DtcWaitHandle.WaitOne();
- Monitor.Enter(TransactionContext.SyncRoot);
+ Monitor.Exit(transactionContext.SyncRoot);
+ transactionContext.DtcWaitHandle.WaitOne();
+ Monitor.Enter(transactionContext.SyncRoot);
}
- if (!TransactionContext.InNetTransaction && Transaction.Current != null)
+ if (!transactionContext.InNetTransaction && Transaction.Current != null)
{
Tracer.Debug("NetTxSession detected Ambient Transaction, start new TX with broker");
EnrollInSpecifiedTransaction(Transaction.Current);
@@ -139,7 +147,7 @@ namespace Apache.NMS.ActiveMQ
private void EnrollInSpecifiedTransaction(Transaction tx)
{
- if(TransactionContext.InNetTransaction)
+ if(transactionContext.InNetTransaction)
{
Tracer.Warn("Enlist attempted while a Net TX was Active.");
throw new InvalidOperationException("Session is Already enlisted in a Transaction");
@@ -154,7 +162,7 @@ namespace Apache.NMS.ActiveMQ
// Start a new .NET style transaction, this could be distributed
// or it could just be a Local transaction that could become
// distributed later.
- TransactionContext.Begin(tx);
+ transactionContext.Begin(tx);
}
}
}
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxTransactionContext.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxTransactionContext.cs?rev=1484526&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxTransactionContext.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxTransactionContext.cs Mon May 20 16:57:22 2013
@@ -0,0 +1,598 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Transactions;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.Transactions;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ
+{
+ public sealed class NetTxTransactionContext : TransactionContext, ISinglePhaseNotification
+ {
+ private const int XA_OK = 0;
+ private const int XA_READONLY = 3;
+
+ private Enlistment currentEnlistment;
+
+ public NetTxTransactionContext(Session session) : base(session)
+ {
+ }
+
+ public override bool InLocalTransaction
+ {
+ get { return this.transactionId != null && this.currentEnlistment == null; }
+ }
+
+ public override void Begin()
+ {
+ throw new IllegalStateException("Local Transactions not supported in NetTx resources");
+ }
+
+ public override void Commit()
+ {
+ throw new IllegalStateException("Local Transactions not supported in NetTx resources");
+ }
+
+ public override void Rollback()
+ {
+ throw new IllegalStateException("Local Transactions not supported in NetTx resources");
+ }
+
+ #region Transaction Members used when dealing with .NET System Transactions.
+
+ // When DTC calls prepare we must then wait for either the TX to commit, rollback or
+ // be canceled because its in doubt.
+ private readonly ManualResetEvent dtcControlEvent = new ManualResetEvent(true);
+
+ // Once the DTC calls prepare we lock this object and don't unlock it again until
+ // the TX has either completed or terminated, the users of this class should use
+ // this sync point when the TX is a DTC version as opposed to a local one.
+ private readonly object syncObject = new Mutex();
+
+ public enum TxState
+ {
+ None = 0, Active = 1, Pending = 2
+ }
+
+ private TxState netTxState = TxState.None;
+
+ public object SyncRoot
+ {
+ get { return this.syncObject; }
+ }
+
+ public bool InNetTransaction
+ {
+ get { return this.transactionId != null && this.transactionId is XATransactionId; }
+ }
+
+ public TxState NetTxState
+ {
+ get
+ {
+ return this.netTxState;
+ }
+ }
+
+ public WaitHandle DtcWaitHandle
+ {
+ get { return dtcControlEvent; }
+ }
+
+ public void Begin(Transaction transaction)
+ {
+ lock (syncObject)
+ {
+ this.netTxState = TxState.Active;
+ dtcControlEvent.Reset();
+
+ Tracer.Debug("Begin notification received");
+
+ if (InNetTransaction)
+ {
+ throw new TransactionInProgressException("A Transaction is already in Progress");
+ }
+
+ try
+ {
+ Guid rmId = ResourceManagerGuid;
+
+ // Enlist this object in the transaction.
+ this.currentEnlistment =
+ transaction.EnlistDurable(rmId, this, EnlistmentOptions.None);
+
+ Tracer.Debug("Enlisted in Durable Transaction with RM Id: " + rmId);
+
+ TransactionInformation txInfo = transaction.TransactionInformation;
+
+ XATransactionId xaId = new XATransactionId();
+ this.transactionId = xaId;
+
+ if (txInfo.DistributedIdentifier != Guid.Empty)
+ {
+ xaId.GlobalTransactionId = txInfo.DistributedIdentifier.ToByteArray();
+ xaId.BranchQualifier = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
+ }
+ else
+ {
+ xaId.GlobalTransactionId = Encoding.UTF8.GetBytes(txInfo.LocalIdentifier);
+ xaId.BranchQualifier = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
+ }
+
+ // Now notify the broker that a new XA'ish transaction has started.
+ TransactionInfo info = new TransactionInfo();
+ info.ConnectionId = this.connection.ConnectionId;
+ info.TransactionId = this.transactionId;
+ info.Type = (int)TransactionType.Begin;
+
+ this.session.Connection.Oneway(info);
+
+ SignalTransactionStarted();
+
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("Began XA'ish Transaction:" + xaId.GlobalTransactionId);
+ }
+ }
+ catch (Exception)
+ {
+ dtcControlEvent.Set();
+ throw;
+ }
+ }
+ }
+
+ public void Prepare(PreparingEnlistment preparingEnlistment)
+ {
+ lock (this.syncObject)
+ {
+ this.netTxState = TxState.Pending;
+
+ try
+ {
+ Tracer.Debug("Prepare notification received for TX id: " + this.transactionId);
+
+ BeforeEnd();
+
+ // Before sending the request to the broker, log the recovery bits, if
+ // this fails we can't prepare and the TX should be rolled back.
+ RecoveryLogger.LogRecoveryInfo(this.transactionId as XATransactionId,
+ preparingEnlistment.RecoveryInformation());
+
+ // Inform the broker that work on the XA'sh TX Branch is complete.
+ TransactionInfo info = new TransactionInfo();
+ info.ConnectionId = this.connection.ConnectionId;
+ info.TransactionId = this.transactionId;
+ info.Type = (int)TransactionType.End;
+
+ this.connection.CheckConnected();
+ this.connection.SyncRequest(info);
+
+ // Prepare the Transaction for commit.
+ info.Type = (int)TransactionType.Prepare;
+ IntegerResponse response = (IntegerResponse)this.connection.SyncRequest(info);
+ if (response.Result == XA_READONLY)
+ {
+ Tracer.Debug("Transaction Prepare done and doesn't need a commit, TX id: " + this.transactionId);
+
+ this.transactionId = null;
+ this.currentEnlistment = null;
+
+ // Read Only means there's nothing to recover because there was no
+ // change on the broker.
+ RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
+
+ // if server responds that nothing needs to be done, then reply prepared
+ // but clear the current state data so we appear done to the commit method.
+ preparingEnlistment.Prepared();
+
+ // Done so commit won't be called.
+ AfterCommit();
+
+ // A Read-Only TX is considered closed at this point, DTC won't call us again.
+ this.dtcControlEvent.Set();
+ }
+ else
+ {
+ Tracer.Debug("Transaction Prepare succeeded TX id: " + this.transactionId);
+
+ // If work finished correctly, reply prepared
+ preparingEnlistment.Prepared();
+ }
+ }
+ catch (Exception ex)
+ {
+ Tracer.DebugFormat("Transaction[{0}] Prepare failed with error: {1}",
+ this.transactionId, ex.Message);
+
+ AfterRollback();
+ preparingEnlistment.ForceRollback();
+ try
+ {
+ this.connection.OnException(ex);
+ }
+ catch (Exception error)
+ {
+ Tracer.Error(error.ToString());
+ }
+
+ this.currentEnlistment = null;
+ this.transactionId = null;
+ this.netTxState = TxState.None;
+ this.dtcControlEvent.Set();
+ }
+ }
+ }
+
+ public void Commit(Enlistment enlistment)
+ {
+ lock (this.syncObject)
+ {
+ try
+ {
+ Tracer.Debug("Commit notification received for TX id: " + this.transactionId);
+
+ if (this.transactionId != null)
+ {
+ // Now notify the broker that a new XA'ish transaction has completed.
+ TransactionInfo info = new TransactionInfo();
+ info.ConnectionId = this.connection.ConnectionId;
+ info.TransactionId = this.transactionId;
+ info.Type = (int)TransactionType.CommitTwoPhase;
+
+ this.connection.CheckConnected();
+ this.connection.SyncRequest(info);
+
+ Tracer.Debug("Transaction Commit Done TX id: " + this.transactionId);
+
+ RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
+
+ // if server responds that nothing needs to be done, then reply done.
+ enlistment.Done();
+
+ AfterCommit();
+ }
+ }
+ catch (Exception ex)
+ {
+ Tracer.DebugFormat("Transaction[{0}] Commit failed with error: {1}",
+ this.transactionId, ex.Message);
+ try
+ {
+ this.connection.OnException(ex);
+ }
+ catch (Exception error)
+ {
+ Tracer.Error(error.ToString());
+ }
+ }
+ finally
+ {
+ this.currentEnlistment = null;
+ this.transactionId = null;
+ this.netTxState = TxState.None;
+
+ CountDownLatch latch = this.recoveryComplete;
+ if (latch != null)
+ {
+ latch.countDown();
+ }
+
+ this.dtcControlEvent.Set();
+ }
+ }
+ }
+
+ public void SinglePhaseCommit(SinglePhaseEnlistment enlistment)
+ {
+ lock (this.syncObject)
+ {
+ try
+ {
+ Tracer.Debug("Single Phase Commit notification received for TX id: " + this.transactionId);
+
+ if (this.transactionId != null)
+ {
+ BeforeEnd();
+
+ // Now notify the broker that a new XA'ish transaction has completed.
+ TransactionInfo info = new TransactionInfo();
+ info.ConnectionId = this.connection.ConnectionId;
+ info.TransactionId = this.transactionId;
+ info.Type = (int)TransactionType.CommitOnePhase;
+
+ this.connection.CheckConnected();
+ this.connection.SyncRequest(info);
+
+ Tracer.Debug("Transaction Single Phase Commit Done TX id: " + this.transactionId);
+
+ // if server responds that nothing needs to be done, then reply done.
+ enlistment.Done();
+
+ AfterCommit();
+ }
+ }
+ catch (Exception ex)
+ {
+ Tracer.DebugFormat("Transaction[{0}] Single Phase Commit failed with error: {1}",
+ this.transactionId, ex.Message);
+ AfterRollback();
+ enlistment.Done();
+ try
+ {
+ this.connection.OnException(ex);
+ }
+ catch (Exception error)
+ {
+ Tracer.Error(error.ToString());
+ }
+ }
+ finally
+ {
+ this.currentEnlistment = null;
+ this.transactionId = null;
+ this.netTxState = TxState.None;
+
+ this.dtcControlEvent.Set();
+ }
+ }
+ }
+
+ public void Rollback(Enlistment enlistment)
+ {
+ lock (this.syncObject)
+ {
+ try
+ {
+ Tracer.Debug("Rollback notification received for TX id: " + this.transactionId);
+
+ if (this.transactionId != null)
+ {
+ BeforeEnd();
+
+ // Now notify the broker that a new XA'ish transaction has started.
+ TransactionInfo info = new TransactionInfo();
+ info.ConnectionId = this.connection.ConnectionId;
+ info.TransactionId = this.transactionId;
+ info.Type = (int)TransactionType.End;
+
+ this.connection.CheckConnected();
+ this.connection.SyncRequest(info);
+
+ info.Type = (int)TransactionType.Rollback;
+ this.connection.CheckConnected();
+ this.connection.SyncRequest(info);
+
+ Tracer.Debug("Transaction Rollback Done TX id: " + this.transactionId);
+
+ RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
+
+ // if server responds that nothing needs to be done, then reply done.
+ enlistment.Done();
+
+ AfterRollback();
+ }
+ }
+ catch (Exception ex)
+ {
+ Tracer.DebugFormat("Transaction[{0}] Rollback failed with error: {1}",
+ this.transactionId, ex.Message);
+ AfterRollback();
+ try
+ {
+ this.connection.OnException(ex);
+ }
+ catch (Exception error)
+ {
+ Tracer.Error(error.ToString());
+ }
+ }
+ finally
+ {
+ this.currentEnlistment = null;
+ this.transactionId = null;
+ this.netTxState = TxState.None;
+
+ CountDownLatch latch = this.recoveryComplete;
+ if (latch != null)
+ {
+ latch.countDown();
+ }
+
+ this.dtcControlEvent.Set();
+ }
+ }
+ }
+
+ public void InDoubt(Enlistment enlistment)
+ {
+ lock (syncObject)
+ {
+ try
+ {
+ Tracer.Debug("In Doubt notification received for TX id: " + this.transactionId);
+
+ BeforeEnd();
+
+ // Now notify the broker that Rollback should be performed.
+ TransactionInfo info = new TransactionInfo();
+ info.ConnectionId = this.connection.ConnectionId;
+ info.TransactionId = this.transactionId;
+ info.Type = (int)TransactionType.End;
+
+ this.connection.CheckConnected();
+ this.connection.SyncRequest(info);
+
+ info.Type = (int)TransactionType.Rollback;
+ this.connection.CheckConnected();
+ this.connection.SyncRequest(info);
+
+ Tracer.Debug("InDoubt Transaction Rollback Done TX id: " + this.transactionId);
+
+ RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
+
+ // if server responds that nothing needs to be done, then reply done.
+ enlistment.Done();
+
+ AfterRollback();
+ }
+ finally
+ {
+ this.currentEnlistment = null;
+ this.transactionId = null;
+ this.netTxState = TxState.None;
+
+ CountDownLatch latch = this.recoveryComplete;
+ if (latch != null)
+ {
+ latch.countDown();
+ }
+
+ this.dtcControlEvent.Set();
+ }
+ }
+ }
+
+ #endregion
+
+ #region Distributed Transaction Recovery Bits
+
+ private volatile CountDownLatch recoveryComplete;
+
+ /// <summary>
+ /// Should be called from NetTxSession when created to check if any TX
+ /// data is stored for recovery and whether the Broker has matching info
+ /// stored. If an Transaction is found that belongs to this client and is
+ /// still alive on the Broker it will be recovered, otherwise the stored
+ /// data should be cleared.
+ /// </summary>
+ public void InitializeDtcTxContext()
+ {
+ // initialize the logger with the current Resource Manager Id
+ RecoveryLogger.Initialize(ResourceManagerId);
+
+ KeyValuePair<XATransactionId, byte[]>[] localRecoverables = RecoveryLogger.GetRecoverables();
+ if (localRecoverables.Length == 0)
+ {
+ Tracer.Debug("Did not detect any open DTC transaction records on disk.");
+ // No local data so anything stored on the broker can't be recovered here.
+ return;
+ }
+
+ XATransactionId[] recoverables = TryRecoverBrokerTXIds();
+ if (recoverables.Length == 0)
+ {
+ Tracer.Debug("Did not detect any recoverable transactions at Broker.");
+ // Broker has no recoverable data so nothing to do here, delete the
+ // old recovery log as its stale.
+ RecoveryLogger.Purge();
+ return;
+ }
+
+ List<KeyValuePair<XATransactionId, byte[]>> matches = new List<KeyValuePair<XATransactionId, byte[]>>();
+
+ foreach (XATransactionId recoverable in recoverables)
+ {
+ foreach (KeyValuePair<XATransactionId, byte[]> entry in localRecoverables)
+ {
+ if (entry.Key.Equals(recoverable))
+ {
+ Tracer.DebugFormat("Found a matching TX on Broker to stored Id: {0} reenlisting.", entry.Key);
+ matches.Add(entry);
+ }
+ }
+ }
+
+ if (matches.Count != 0)
+ {
+ this.recoveryComplete = new CountDownLatch(matches.Count);
+
+ foreach (KeyValuePair<XATransactionId, byte[]> recoverable in matches)
+ {
+ this.transactionId = recoverable.Key;
+ Tracer.Info("Reenlisting recovered TX with Id: " + this.transactionId);
+ this.currentEnlistment =
+ TransactionManager.Reenlist(ResourceManagerGuid, recoverable.Value, this);
+ }
+
+ this.recoveryComplete.await();
+ Tracer.Debug("All Recovered TX enlistments Reports complete, Recovery Complete.");
+ TransactionManager.RecoveryComplete(ResourceManagerGuid);
+ return;
+ }
+
+ // The old recovery information doesn't match what's on the broker so we
+ // should discard it as its stale now.
+ RecoveryLogger.Purge();
+ }
+
+ private XATransactionId[] TryRecoverBrokerTXIds()
+ {
+ Tracer.Debug("Checking for Recoverable Transactions on Broker.");
+
+ TransactionInfo info = new TransactionInfo();
+ info.ConnectionId = this.session.Connection.ConnectionId;
+ info.Type = (int)TransactionType.Recover;
+
+ this.connection.CheckConnected();
+ DataArrayResponse response = this.connection.SyncRequest(info) as DataArrayResponse;
+
+ if (response != null && response.Data.Length > 0)
+ {
+ Tracer.DebugFormat("Broker reports there are {0} recoverable XA Transactions", response.Data.Length);
+
+ List<XATransactionId> recovered = new List<XATransactionId>();
+
+ foreach (DataStructure ds in response.Data)
+ {
+ XATransactionId xid = ds as XATransactionId;
+ if (xid != null)
+ {
+ recovered.Add(xid);
+ }
+ }
+
+ return recovered.ToArray();
+ }
+
+ return new XATransactionId[0];
+ }
+
+ #endregion
+
+ internal IRecoveryLogger RecoveryLogger
+ {
+ get { return (this.connection as NetTxConnection).RecoveryPolicy.RecoveryLogger; }
+ }
+
+ internal string ResourceManagerId
+ {
+ get { return (this.connection as NetTxConnection).ResourceManagerGuid.ToString(); }
+ }
+
+ internal Guid ResourceManagerGuid
+ {
+ get { return (this.connection as NetTxConnection).ResourceManagerGuid; }
+ }
+
+ }
+}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=1484526&r1=1484525&r2=1484526&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Mon May 20 16:57:22 2013
@@ -304,11 +304,6 @@ namespace Apache.NMS.ActiveMQ
return;
}
- if(disposing)
- {
- // Dispose managed code here.
- }
-
try
{
// Force a Stop when we are Disposing vs a Normal Close.
@@ -344,10 +339,10 @@ namespace Apache.NMS.ActiveMQ
internal void DoClose()
{
Shutdown();
- RemoveInfo info = new RemoveInfo();
- info.ObjectId = this.info.SessionId;
- info.LastDeliveredSequenceId = this.lastDeliveredSequenceId;
- this.connection.Oneway(info);
+ RemoveInfo removeInfo = new RemoveInfo();
+ removeInfo.ObjectId = this.info.SessionId;
+ removeInfo.LastDeliveredSequenceId = this.lastDeliveredSequenceId;
+ this.connection.Oneway(removeInfo);
}
internal void Shutdown()
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs?rev=1484526&r1=1484525&r2=1484526&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs Mon May 20 16:57:22 2013
@@ -15,15 +15,8 @@
* limitations under the License.
*/
-using System;
-using System.Text;
-using System.Threading;
-using System.Transactions;
using System.Collections;
-using System.Collections.Generic;
-using Apache.NMS.Util;
using Apache.NMS.ActiveMQ.Commands;
-using Apache.NMS.ActiveMQ.Transactions;
namespace Apache.NMS.ActiveMQ
{
@@ -35,16 +28,12 @@ namespace Apache.NMS.ActiveMQ
namespace Apache.NMS.ActiveMQ
{
- public class TransactionContext : ISinglePhaseNotification
+ public class TransactionContext
{
- private const int XA_OK = 0;
- private const int XA_READONLY = 3;
-
- private TransactionId transactionId;
- private readonly Session session;
- private readonly Connection connection;
- private readonly ArrayList synchronizations = ArrayList.Synchronized(new ArrayList());
- private Enlistment currentEnlistment;
+ protected TransactionId transactionId;
+ protected readonly Session session;
+ protected readonly Connection connection;
+ protected readonly ArrayList synchronizations = ArrayList.Synchronized(new ArrayList());
public TransactionContext(Session session)
{
@@ -57,9 +46,9 @@ namespace Apache.NMS.ActiveMQ
get{ return this.transactionId != null; }
}
- public bool InLocalTransaction
+ public virtual bool InLocalTransaction
{
- get{ return this.transactionId != null && this.currentEnlistment == null; }
+ get{ return this.transactionId != null; }
}
public TransactionId TransactionId
@@ -67,9 +56,6 @@ namespace Apache.NMS.ActiveMQ
get { return transactionId; }
}
- /// <summary>
- /// Method AddSynchronization
- /// </summary>
public void AddSynchronization(ISynchronization synchronization)
{
synchronizations.Add(synchronization);
@@ -80,7 +66,7 @@ namespace Apache.NMS.ActiveMQ
synchronizations.Remove(synchronization);
}
- public void Begin()
+ public virtual void Begin()
{
if(!InTransaction)
{
@@ -93,19 +79,16 @@ namespace Apache.NMS.ActiveMQ
this.session.Connection.Oneway(info);
- if(this.TransactionStartedListener != null)
- {
- this.TransactionStartedListener(this.session);
- }
+ SignalTransactionStarted();
if(Tracer.IsDebugEnabled)
{
- Tracer.Debug("Begin:" + this.transactionId.ToString());
+ Tracer.Debug("Begin:" + this.transactionId);
}
}
}
- public void Rollback()
+ public virtual void Rollback()
{
if(InTransaction)
{
@@ -130,7 +113,7 @@ namespace Apache.NMS.ActiveMQ
}
}
- public void Commit()
+ public virtual void Commit()
{
if(InTransaction)
{
@@ -177,10 +160,7 @@ namespace Apache.NMS.ActiveMQ
synchronization.AfterCommit();
}
- if(this.TransactionCommittedListener != null)
- {
- this.TransactionCommittedListener(this.session);
- }
+ SignalTransactionCommitted();
}
}
finally
@@ -200,10 +180,7 @@ namespace Apache.NMS.ActiveMQ
synchronization.AfterRollback();
}
- if(this.TransactionRolledBackListener != null)
- {
- this.TransactionRolledBackListener(this.session);
- }
+ SignalTransactionRolledBack();
}
}
finally
@@ -218,547 +195,31 @@ namespace Apache.NMS.ActiveMQ
public event SessionTxEventDelegate TransactionCommittedListener;
public event SessionTxEventDelegate TransactionRolledBackListener;
- #endregion
-
- #region Transaction Members used when dealing with .NET System Transactions.
-
- // When DTC calls prepare we must then wait for either the TX to commit, rollback or
- // be canceled because its in doubt.
- private readonly ManualResetEvent dtcControlEvent = new ManualResetEvent(true);
-
- // Once the DTC calls prepare we lock this object and don't unlock it again until
- // the TX has either completed or terminated, the users of this class should use
- // this sync point when the TX is a DTC version as opposed to a local one.
- private readonly object syncObject = new Mutex();
-
- public enum TxState
- {
- None = 0, Active = 1, Pending = 2
- }
-
- private TxState netTxState = TxState.None;
-
- public object SyncRoot
- {
- get { return this.syncObject; }
- }
-
- public bool InNetTransaction
- {
- get{ return this.transactionId != null && this.transactionId is XATransactionId; }
- }
-
- public TxState NetTxState
+ protected void SignalTransactionStarted()
{
- get
+ if (this.TransactionStartedListener != null)
{
- return this.netTxState;
+ this.TransactionStartedListener(this.session);
}
}
- public WaitHandle DtcWaitHandle
- {
- get { return dtcControlEvent; }
- }
-
- public void Begin(Transaction transaction)
+ protected void SignalTransactionCommitted()
{
- lock (syncObject)
+ if (this.TransactionCommittedListener != null)
{
- this.netTxState = TxState.Active;
- dtcControlEvent.Reset();
-
- Tracer.Debug("Begin notification received");
-
- if (InNetTransaction)
- {
- throw new TransactionInProgressException("A Transaction is already in Progress");
- }
-
- try
- {
- Guid rmId = ResourceManagerGuid;
-
- // Enlist this object in the transaction.
- this.currentEnlistment =
- transaction.EnlistDurable(rmId, this, EnlistmentOptions.None);
-
- Tracer.Debug("Enlisted in Durable Transaction with RM Id: " + rmId);
-
- TransactionInformation txInfo = transaction.TransactionInformation;
-
- XATransactionId xaId = new XATransactionId();
- this.transactionId = xaId;
-
- if (txInfo.DistributedIdentifier != Guid.Empty)
- {
- xaId.GlobalTransactionId = txInfo.DistributedIdentifier.ToByteArray();
- xaId.BranchQualifier = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
- }
- else
- {
- xaId.GlobalTransactionId = Encoding.UTF8.GetBytes(txInfo.LocalIdentifier);
- xaId.BranchQualifier = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
- }
-
- // Now notify the broker that a new XA'ish transaction has started.
- TransactionInfo info = new TransactionInfo();
- info.ConnectionId = this.connection.ConnectionId;
- info.TransactionId = this.transactionId;
- info.Type = (int) TransactionType.Begin;
-
- this.session.Connection.Oneway(info);
-
- if (this.TransactionStartedListener != null)
- {
- this.TransactionStartedListener(this.session);
- }
-
- if (Tracer.IsDebugEnabled)
- {
- Tracer.Debug("Began XA'ish Transaction:" + xaId.GlobalTransactionId.ToString());
- }
- }
- catch (Exception)
- {
- dtcControlEvent.Set();
- throw;
- }
+ this.TransactionCommittedListener(this.session);
}
}
- public void Prepare(PreparingEnlistment preparingEnlistment)
+ protected void SignalTransactionRolledBack()
{
- lock (this.syncObject)
+ if (this.TransactionRolledBackListener != null)
{
- this.netTxState = TxState.Pending;
-
- try
- {
- Tracer.Debug("Prepare notification received for TX id: " + this.transactionId);
-
- BeforeEnd();
-
- // Before sending the request to the broker, log the recovery bits, if
- // this fails we can't prepare and the TX should be rolled back.
- RecoveryLogger.LogRecoveryInfo(this.transactionId as XATransactionId,
- preparingEnlistment.RecoveryInformation());
-
- // Inform the broker that work on the XA'sh TX Branch is complete.
- TransactionInfo info = new TransactionInfo();
- info.ConnectionId = this.connection.ConnectionId;
- info.TransactionId = this.transactionId;
- info.Type = (int) TransactionType.End;
-
- this.connection.CheckConnected();
- this.connection.SyncRequest(info);
-
- // Prepare the Transaction for commit.
- info.Type = (int) TransactionType.Prepare;
- IntegerResponse response = (IntegerResponse) this.connection.SyncRequest(info);
- if (response.Result == XA_READONLY)
- {
- Tracer.Debug("Transaction Prepare done and doesn't need a commit, TX id: " + this.transactionId);
-
- this.transactionId = null;
- this.currentEnlistment = null;
-
- // Read Only means there's nothing to recover because there was no
- // change on the broker.
- RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
-
- // if server responds that nothing needs to be done, then reply prepared
- // but clear the current state data so we appear done to the commit method.
- preparingEnlistment.Prepared();
-
- // Done so commit won't be called.
- AfterCommit();
-
- // A Read-Only TX is considered closed at this point, DTC won't call us again.
- this.dtcControlEvent.Set();
- }
- else
- {
- Tracer.Debug("Transaction Prepare succeeded TX id: " + this.transactionId);
-
- // If work finished correctly, reply prepared
- preparingEnlistment.Prepared();
- }
- }
- catch (Exception ex)
- {
- Tracer.DebugFormat("Transaction[{0}] Prepare failed with error: {1}",
- this.transactionId, ex.Message);
-
- AfterRollback();
- preparingEnlistment.ForceRollback();
- try
- {
- this.connection.OnException(ex);
- }
- catch (Exception error)
- {
- Tracer.Error(error.ToString());
- }
-
- this.currentEnlistment = null;
- this.transactionId = null;
- this.netTxState = TxState.None;
- this.dtcControlEvent.Set();
- }
+ this.TransactionRolledBackListener(this.session);
}
}
- public void Commit(Enlistment enlistment)
- {
- lock (this.syncObject)
- {
- try
- {
- Tracer.Debug("Commit notification received for TX id: " + this.transactionId);
-
- if (this.transactionId != null)
- {
- // Now notify the broker that a new XA'ish transaction has completed.
- TransactionInfo info = new TransactionInfo();
- info.ConnectionId = this.connection.ConnectionId;
- info.TransactionId = this.transactionId;
- info.Type = (int) TransactionType.CommitTwoPhase;
-
- this.connection.CheckConnected();
- this.connection.SyncRequest(info);
-
- Tracer.Debug("Transaction Commit Done TX id: " + this.transactionId);
-
- RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
-
- // if server responds that nothing needs to be done, then reply done.
- enlistment.Done();
-
- AfterCommit();
- }
- }
- catch (Exception ex)
- {
- Tracer.DebugFormat("Transaction[{0}] Commit failed with error: {1}",
- this.transactionId, ex.Message);
- try
- {
- this.connection.OnException(ex);
- }
- catch (Exception error)
- {
- Tracer.Error(error.ToString());
- }
- }
- finally
- {
- this.currentEnlistment = null;
- this.transactionId = null;
- this.netTxState = TxState.None;
-
- CountDownLatch latch = this.recoveryComplete;
- if (latch != null)
- {
- latch.countDown();
- }
-
- this.dtcControlEvent.Set();
- }
- }
- }
-
- public void SinglePhaseCommit(SinglePhaseEnlistment enlistment)
- {
- lock (this.syncObject)
- {
- try
- {
- Tracer.Debug("Single Phase Commit notification received for TX id: " + this.transactionId);
-
- if (this.transactionId != null)
- {
- BeforeEnd();
-
- // Now notify the broker that a new XA'ish transaction has completed.
- TransactionInfo info = new TransactionInfo();
- info.ConnectionId = this.connection.ConnectionId;
- info.TransactionId = this.transactionId;
- info.Type = (int) TransactionType.CommitOnePhase;
-
- this.connection.CheckConnected();
- this.connection.SyncRequest(info);
-
- Tracer.Debug("Transaction Single Phase Commit Done TX id: " + this.transactionId);
-
- // if server responds that nothing needs to be done, then reply done.
- enlistment.Done();
-
- AfterCommit();
- }
- }
- catch (Exception ex)
- {
- Tracer.DebugFormat("Transaction[{0}] Single Phase Commit failed with error: {1}",
- this.transactionId, ex.Message);
- AfterRollback();
- enlistment.Done();
- try
- {
- this.connection.OnException(ex);
- }
- catch (Exception error)
- {
- Tracer.Error(error.ToString());
- }
- }
- finally
- {
- this.currentEnlistment = null;
- this.transactionId = null;
- this.netTxState = TxState.None;
-
- this.dtcControlEvent.Set();
- }
- }
- }
-
- public void Rollback(Enlistment enlistment)
- {
- lock (this.syncObject)
- {
- try
- {
- Tracer.Debug("Rollback notification received for TX id: " + this.transactionId);
-
- if (this.transactionId != null)
- {
- BeforeEnd();
-
- // Now notify the broker that a new XA'ish transaction has started.
- TransactionInfo info = new TransactionInfo();
- info.ConnectionId = this.connection.ConnectionId;
- info.TransactionId = this.transactionId;
- info.Type = (int) TransactionType.End;
-
- this.connection.CheckConnected();
- this.connection.SyncRequest(info);
-
- info.Type = (int) TransactionType.Rollback;
- this.connection.CheckConnected();
- this.connection.SyncRequest(info);
-
- Tracer.Debug("Transaction Rollback Done TX id: " + this.transactionId);
-
- RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
-
- // if server responds that nothing needs to be done, then reply done.
- enlistment.Done();
-
- AfterRollback();
- }
- }
- catch (Exception ex)
- {
- Tracer.DebugFormat("Transaction[{0}] Rollback failed with error: {1}",
- this.transactionId, ex.Message);
- AfterRollback();
- try
- {
- this.connection.OnException(ex);
- }
- catch (Exception error)
- {
- Tracer.Error(error.ToString());
- }
- }
- finally
- {
- this.currentEnlistment = null;
- this.transactionId = null;
- this.netTxState = TxState.None;
-
- CountDownLatch latch = this.recoveryComplete;
- if (latch != null)
- {
- latch.countDown();
- }
-
- this.dtcControlEvent.Set();
- }
- }
- }
-
- public void InDoubt(Enlistment enlistment)
- {
- lock (syncObject)
- {
- try
- {
- Tracer.Debug("In Doubt notification received for TX id: " + this.transactionId);
-
- BeforeEnd();
-
- // Now notify the broker that Rollback should be performed.
- TransactionInfo info = new TransactionInfo();
- info.ConnectionId = this.connection.ConnectionId;
- info.TransactionId = this.transactionId;
- info.Type = (int) TransactionType.End;
-
- this.connection.CheckConnected();
- this.connection.SyncRequest(info);
-
- info.Type = (int) TransactionType.Rollback;
- this.connection.CheckConnected();
- this.connection.SyncRequest(info);
-
- Tracer.Debug("InDoubt Transaction Rollback Done TX id: " + this.transactionId);
-
- RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
-
- // if server responds that nothing needs to be done, then reply done.
- enlistment.Done();
-
- AfterRollback();
- }
- finally
- {
- this.currentEnlistment = null;
- this.transactionId = null;
- this.netTxState = TxState.None;
-
- CountDownLatch latch = this.recoveryComplete;
- if (latch != null)
- {
- latch.countDown();
- }
-
- this.dtcControlEvent.Set();
- }
- }
- }
-
- #endregion
-
- #region Distributed Transaction Recovery Bits
-
- private volatile CountDownLatch recoveryComplete = null;
-
- /// <summary>
- /// Should be called from NetTxSession when created to check if any TX
- /// data is stored for recovery and whether the Broker has matching info
- /// stored. If an Transaction is found that belongs to this client and is
- /// still alive on the Broker it will be recovered, otherwise the stored
- /// data should be cleared.
- /// </summary>
- public void InitializeDtcTxContext()
- {
- // initialize the logger with the current Resource Manager Id
- RecoveryLogger.Initialize(ResourceManagerId);
-
- KeyValuePair<XATransactionId, byte[]>[] localRecoverables = RecoveryLogger.GetRecoverables();
- if (localRecoverables.Length == 0)
- {
- Tracer.Debug("Did not detect any open DTC transaction records on disk.");
- // No local data so anything stored on the broker can't be recovered here.
- return;
- }
-
- XATransactionId[] recoverables = TryRecoverBrokerTXIds();
- if (recoverables.Length == 0)
- {
- Tracer.Debug("Did not detect any recoverable transactions at Broker.");
- // Broker has no recoverable data so nothing to do here, delete the
- // old recovery log as its stale.
- RecoveryLogger.Purge();
- return;
- }
-
- List<KeyValuePair<XATransactionId, byte[]>> matches = new List<KeyValuePair<XATransactionId, byte[]>>();
-
- foreach(XATransactionId recoverable in recoverables)
- {
- foreach(KeyValuePair<XATransactionId, byte[]> entry in localRecoverables)
- {
- if(entry.Key.Equals(recoverable))
- {
- Tracer.DebugFormat("Found a matching TX on Broker to stored Id: {0} reenlisting.", entry.Key);
- matches.Add(entry);
- }
- }
- }
-
- if (matches.Count != 0)
- {
- this.recoveryComplete = new CountDownLatch(matches.Count);
-
- foreach (KeyValuePair<XATransactionId, byte[]> recoverable in matches)
- {
- this.transactionId = recoverable.Key;
- Tracer.Info("Reenlisting recovered TX with Id: " + this.transactionId);
- this.currentEnlistment =
- TransactionManager.Reenlist(ResourceManagerGuid, recoverable.Value, this);
- }
-
- this.recoveryComplete.await();
- Tracer.Debug("All Recovered TX enlistments Reports complete, Recovery Complete.");
- TransactionManager.RecoveryComplete(ResourceManagerGuid);
- return;
- }
-
- // The old recovery information doesn't match what's on the broker so we
- // should discard it as its stale now.
- RecoveryLogger.Purge();
- }
-
- private XATransactionId[] TryRecoverBrokerTXIds()
- {
- Tracer.Debug("Checking for Recoverable Transactions on Broker.");
-
- TransactionInfo info = new TransactionInfo();
- info.ConnectionId = this.session.Connection.ConnectionId;
- info.Type = (int)TransactionType.Recover;
-
- this.connection.CheckConnected();
- DataArrayResponse response = this.connection.SyncRequest(info) as DataArrayResponse;
-
- if (response != null && response.Data.Length > 0)
- {
- Tracer.DebugFormat("Broker reports there are {0} recoverable XA Transactions", response.Data.Length);
-
- List<XATransactionId> recovered = new List<XATransactionId>();
-
- foreach (DataStructure ds in response.Data)
- {
- XATransactionId xid = ds as XATransactionId;
- if (xid != null)
- {
- recovered.Add(xid);
- }
- }
-
- return recovered.ToArray();
- }
-
- return new XATransactionId[0];
- }
-
- #endregion
-
- internal IRecoveryLogger RecoveryLogger
- {
- get { return (this.connection as NetTxConnection).RecoveryPolicy.RecoveryLogger; }
- }
-
- internal string ResourceManagerId
- {
- get { return (this.connection as NetTxConnection).ResourceManagerGuid.ToString(); }
- }
-
- internal Guid ResourceManagerGuid
- {
- get { return (this.connection as NetTxConnection).ResourceManagerGuid; }
- }
+ #endregion
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj?rev=1484526&r1=1484525&r2=1484526&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj Mon May 20 16:57:22 2013
@@ -318,6 +318,7 @@
<Compile Include="src\main\csharp\NetTxMessageConsumer.cs" />
<Compile Include="src\main\csharp\NetTxRecoveryPolicy.cs" />
<Compile Include="src\main\csharp\NetTxSession.cs" />
+ <Compile Include="src\main\csharp\NetTxTransactionContext.cs" />
<Compile Include="src\main\csharp\OpenWire\BaseDataStreamMarshaller.cs">
<SubType>Code</SubType>
</Compile>