You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/04/26 23:31:05 UTC
svn commit: r1096919 - in
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src:
main/csharp/MessageConsumer.cs main/csharp/NetTxSession.cs
main/csharp/Session.cs main/csharp/TransactionContext.cs
test/csharp/DtcConsumerTransactionsTest.cs
Author: tabish
Date: Tue Apr 26 21:31:05 2011
New Revision: 1096919
URL: http://svn.apache.org/viewvc?rev=1096919&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQNET-326
Fixes some more edge cases on TX rollback and message consumption.
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/NetTxSession.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Session.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/TransactionContext.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/DtcConsumerTransactionsTest.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs?rev=1096919&r1=1096918&r2=1096919&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs Tue Apr 26 21:31:05 2011
@@ -330,11 +330,15 @@ namespace Apache.NMS.ActiveMQ
{
if(this.session.IsTransacted && this.session.TransactionContext.InTransaction)
{
- this.session.TransactionContext.AddSynchronization(new ConsumerCloseSynchronization(this));
+ Tracer.DebugFormat("Consumer {0} Registering new ConsumerCloseSynchronization",
+ this.info.ConsumerId);
+ this.session.TransactionContext.AddSynchronization(new ConsumerCloseSynchronization(this));
}
else
{
- this.DoClose();
+ Tracer.DebugFormat("Consumer {0} No Active TX closing normally.",
+ this.info.ConsumerId);
+ this.DoClose();
}
}
}
@@ -765,12 +769,18 @@ namespace Apache.NMS.ActiveMQ
{
if (this.session.IsTransacted)
{
- // In the case where the consumer is operating in concert with
- // a distributed TX manager we need to wait whenever the TX
- // is controlled by the DTC as it completes all operations
- // async and we cannot start consumption again until all its
- // tasks have completed.
- this.session.TransactionContext.DtcWaitHandle.WaitOne();
+ this.session.TransactionContext.SyncRoot.WaitOne();
+
+ // In the case where the consumer is operating in concert with a
+ // distributed TX manager we need to wait whenever the TX is being
+ // controlled by the DTC as it completes all operations async and
+ // we cannot start consumption again until all its tasks have completed.)
+ if (this.session.TransactionContext.InNetTransaction &&
+ this.session.TransactionContext.NetTxState == TransactionContext.TxState.Pending)
+ {
+ this.session.TransactionContext.SyncRoot.ReleaseMutex();
+ this.session.TransactionContext.DtcWaitHandle.WaitOne();
+ }
}
lock(this.dispatchedMessages)
@@ -780,7 +790,7 @@ namespace Apache.NMS.ActiveMQ
if(this.session.IsTransacted)
{
- this.session.TransactionContext.DtcWaitHandle.WaitOne();
+ //this.session.TransactionContext.DtcWaitHandle.WaitOne();
this.AckLater(dispatch, AckType.DeliveredAck);
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/NetTxSession.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/NetTxSession.cs?rev=1096919&r1=1096918&r2=1096919&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/NetTxSession.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/NetTxSession.cs Tue Apr 26 21:31:05 2011
@@ -92,18 +92,14 @@ namespace Apache.NMS.ActiveMQ
if (!TransactionContext.InNetTransaction && Transaction.Current != null)
{
Tracer.Debug("NetTxSession detected Ambient Transaction, start new TX with broker");
-
EnrollInSpecifiedTransaction(Transaction.Current);
}
+
+ TransactionContext.SyncRoot.ReleaseMutex();
}
private void EnrollInSpecifiedTransaction(Transaction tx)
{
- // If an Async DTC operation is in progress such as Commit or Rollback
- // we need to let it complete before deciding if the Session is in a TX
- // otherwise we might error out for no reason.
- //TransactionContext.DtcWaitHandle.WaitOne();
-
if(TransactionContext.InNetTransaction)
{
Tracer.Warn("Enlist attempted while a Net TX was Active.");
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Session.cs?rev=1096919&r1=1096918&r2=1096919&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Session.cs Tue Apr 26 21:31:05 2011
@@ -315,8 +315,11 @@ namespace Apache.NMS.ActiveMQ
{
TransactionContext.SyncRoot.ReleaseMutex();
- //this.transactionContext.AddSynchronization(new SessionCloseSynchronization(this));)
+ // Must wait for all the DTC operations to complete before
+ // moving on from this close call.
this.transactionContext.DtcWaitHandle.WaitOne();
+
+ TransactionContext.SyncRoot.WaitOne();
}
TransactionContext.SyncRoot.ReleaseMutex();
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/TransactionContext.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/TransactionContext.cs?rev=1096919&r1=1096918&r2=1096919&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/TransactionContext.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/TransactionContext.cs Tue Apr 26 21:31:05 2011
@@ -243,6 +243,7 @@ namespace Apache.NMS.ActiveMQ
lock (syncObject)
{
this.netTxState = TxState.Active;
+ dtcControlEvent.Reset();
Tracer.Debug("Begin notification received");
@@ -306,8 +307,6 @@ namespace Apache.NMS.ActiveMQ
try
{
- dtcControlEvent.Reset();
-
Tracer.Debug("Prepare notification received for TX id: " + this.transactionId);
BeforeEnd();
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/DtcConsumerTransactionsTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/DtcConsumerTransactionsTest.cs?rev=1096919&r1=1096918&r2=1096919&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/DtcConsumerTransactionsTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/DtcConsumerTransactionsTest.cs Tue Apr 26 21:31:05 2011
@@ -23,7 +23,6 @@ using System.Threading;
using System.Transactions;
using Apache.NMS.ActiveMQ.Transport;
using Apache.NMS.ActiveMQ.Transport.Tcp;
-using Apache.NMS.Util;
using NUnit.Framework;
namespace Apache.NMS.ActiveMQ.Test
@@ -238,6 +237,77 @@ namespace Apache.NMS.ActiveMQ.Test
}
[Test]
+ public void TestRedeliveredNoComplete()
+ {
+ const int messageCount = 300;
+ const int receiveCount = 150;
+
+ // enqueue several messages
+ PurgeDatabase();
+ PurgeAndFillQueue(messageCount);
+
+ INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+ using (INetTxConnection connection = factory.CreateNetTxConnection())
+ {
+ // allow no redelivery so that message immediatly goes to the DLQ if first read fails
+ connection.RedeliveryPolicy.MaximumRedeliveries = 0;
+ connection.Start();
+
+ // receive half of total messages
+ using (INetTxSession session = connection.CreateNetTxSession())
+ {
+ IQueue queue = session.GetQueue(testQueueName);
+
+ // read message from queue and insert into db table
+ using (IMessageConsumer consumer = session.CreateConsumer(queue))
+ {
+ for (int i = 0; i < receiveCount; i++)
+ {
+ using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
+ using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+ using (SqlCommand sqlInsertCommand = new SqlCommand())
+ {
+ sqlConnection.Open();
+ sqlInsertCommand.Connection = sqlConnection;
+
+ ITextMessage message =
+ consumer.Receive(TimeSpan.FromMilliseconds(10000)) as ITextMessage;
+ sqlInsertCommand.CommandText =
+ string.Format("INSERT INTO {0} VALUES ({1})", testTable,
+ Convert.ToInt32(message.Text));
+ sqlInsertCommand.ExecuteNonQuery();
+ }
+ }
+ }
+
+ session.Close();
+ }
+ }
+
+ Tracer.Debug("First stage ok");
+
+ // check that others message have status redelivered = false
+ IConnectionFactory checkFactory = new ConnectionFactory(ReplaceEnvVar(connectionURI));
+
+ using (IConnection connection = checkFactory.CreateConnection())
+ {
+ connection.Start();
+
+ using (ISession session = connection.CreateSession())
+ using (IQueueBrowser browser = session.CreateBrowser(session.GetQueue(testQueueName)))
+ {
+ IEnumerator enumerator = browser.GetEnumerator();
+ while (enumerator.MoveNext())
+ {
+ IMessage msg = enumerator.Current as IMessage;
+ Assert.IsNotNull(msg, "message is not in the queue!");
+ Assert.IsFalse(msg.NMSRedelivered, "message is redelivered!");
+ }
+ }
+ }
+ }
+
+ [Test]
public void TestRecoveryAfterCommitFailsBeforeSent()
{
// Test initialize - Fills in queue with data to send and clears the DB.