You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/04/08 16:19:20 UTC
svn commit: r1090261 - in
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src:
main/csharp/MessageConsumer.cs main/csharp/Session.cs
main/csharp/TransactionContext.cs test/csharp/DtcConsumerTransactionsTest.cs
test/csharp/NetTxTransactionTest.cs
Author: tabish
Date: Fri Apr 8 14:19:19 2011
New Revision: 1090261
URL: http://svn.apache.org/viewvc?rev=1090261&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQNET-326
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NetTxTransactionTest.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1090261&r1=1090260&r2=1090261&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Fri Apr 8 14:19:19 2011
@@ -42,7 +42,7 @@ namespace Apache.NMS.ActiveMQ
private readonly MessageDispatchChannel unconsumedMessages;
private readonly LinkedList<MessageDispatch> dispatchedMessages = new LinkedList<MessageDispatch>();
private readonly ConsumerInfo info;
- private Session session;
+ private readonly Session session;
private MessageAck pendingAck = null;
@@ -62,9 +62,6 @@ namespace Apache.NMS.ActiveMQ
private Exception failureError;
- private const int DEFAULT_REDELIVERY_DELAY = 0;
- private const int DEFAULT_MAX_REDELIVERIES = 5;
-
private event MessageListener listener;
private IRedeliveryPolicy redeliveryPolicy;
@@ -1227,12 +1224,16 @@ namespace Apache.NMS.ActiveMQ
public void AfterCommit()
{
- this.consumer.DoClose();
+ Tracer.DebugFormat("ConsumerCloseSynchronization - AfterCommit Called for Consumer {0}.",
+ this.consumer.ConsumerId);
+ this.consumer.DoClose();
}
public void AfterRollback()
{
- this.consumer.DoClose();
+ Tracer.DebugFormat("ConsumerCloseSynchronization - AfterRollback Called for Consumer {0}.",
+ this.consumer.ConsumerId);
+ this.consumer.DoClose();
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=1090261&r1=1090260&r2=1090261&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Fri Apr 8 14:19:19 2011
@@ -41,7 +41,7 @@ namespace Apache.NMS.ActiveMQ
private readonly SessionExecutor executor;
private readonly TransactionContext transactionContext;
- private Connection connection;
+ private readonly Connection connection;
private bool dispatchAsync;
private bool exclusive;
@@ -293,31 +293,29 @@ namespace Apache.NMS.ActiveMQ
public void Close()
{
- lock(myLock)
+ if(this.closed)
{
- if(this.closed)
- {
- return;
- }
+ return;
+ }
- try
+ try
+ {
+ if(transactionContext.InNetTransaction)
{
- if(transactionContext.InNetTransaction)
- {
- this.transactionContext.AddSynchronization(new SessionCloseSynchronization(this));
- }
- else
- {
- Tracer.InfoFormat("Closing The Session with Id {0}", this.info.SessionId.ToString());
- DoClose();
- Tracer.InfoFormat("Closed The Session with Id {0}", this.info.SessionId.ToString());
- }
+ this.transactionContext.AddSynchronization(new SessionCloseSynchronization(this));
+ this.transactionContext.DtcWaitHandle.WaitOne();
}
- catch(Exception ex)
+ else
{
- Tracer.ErrorFormat("Error during session close: {0}", ex);
+ Tracer.InfoFormat("Closing The Session with Id {0}", this.info.SessionId);
+ DoClose();
+ Tracer.InfoFormat("Closed The Session with Id {0}", this.info.SessionId);
}
}
+ catch(Exception ex)
+ {
+ Tracer.ErrorFormat("Error during session close: {0}", ex);
+ }
}
internal void DoClose()
@@ -331,9 +329,16 @@ namespace Apache.NMS.ActiveMQ
internal void Shutdown()
{
+ Tracer.InfoFormat("Executing Shutdown on Session with Id {0}", this.info.SessionId);
+
+ if(this.closed)
+ {
+ return;
+ }
+
lock(myLock)
{
- if(this.closed)
+ if(this.closed || this.closing)
{
return;
}
@@ -366,8 +371,8 @@ namespace Apache.NMS.ActiveMQ
}
producers.Clear();
- // If in a transaction roll it back
- if(this.IsTransacted && this.transactionContext.InLocalTransaction)
+ // If in a local transaction we just roll back at this point.
+ if (this.IsTransacted && this.transactionContext.InLocalTransaction)
{
try
{
@@ -954,12 +959,14 @@ namespace Apache.NMS.ActiveMQ
public void AfterCommit()
{
- this.session.DoClose();
+ Tracer.Debug("SessionCloseSynchronization AfterCommit called for Session: " + session.SessionId);
+ session.DoClose();
}
public void AfterRollback()
{
- this.session.DoClose();
+ Tracer.Debug("SessionCloseSynchronization AfterRollback called for Session: " + session.SessionId);
+ session.DoClose();
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs?rev=1090261&r1=1090260&r2=1090261&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs Fri Apr 8 14:19:19 2011
@@ -220,41 +220,51 @@ namespace Apache.NMS.ActiveMQ
throw new TransactionInProgressException("A Transaction is already in Progress");
}
- Guid rmId = ResourceManagerGuid;
+ dtcControlEvent.Reset();
- // Enlist this object in the transaction.
- this.currentEnlistment =
- transaction.EnlistDurable(rmId, this, EnlistmentOptions.None);
+ try
+ {
+ Guid rmId = ResourceManagerGuid;
- Tracer.Debug("Enlisted in Durable Transaction with RM Id: " + rmId);
+ // Enlist this object in the transaction.
+ this.currentEnlistment =
+ transaction.EnlistDurable(rmId, this, EnlistmentOptions.None);
- TransactionInformation txInfo = transaction.TransactionInformation;
+ Tracer.Debug("Enlisted in Durable Transaction with RM Id: " + rmId);
- XATransactionId xaId = new XATransactionId();
- this.transactionId = xaId;
+ TransactionInformation txInfo = transaction.TransactionInformation;
- if(txInfo.DistributedIdentifier != Guid.Empty)
- {
- xaId.GlobalTransactionId = txInfo.DistributedIdentifier.ToByteArray();
- xaId.BranchQualifier = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
- }
- else
- {
- xaId.GlobalTransactionId = Encoding.UTF8.GetBytes(txInfo.LocalIdentifier);
- xaId.BranchQualifier = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
- }
+ XATransactionId xaId = new XATransactionId();
+ this.transactionId = xaId;
- // Now notify the broker that a new XA'ish transaction has started.
- TransactionInfo info = new TransactionInfo();
- info.ConnectionId = this.connection.ConnectionId;
- info.TransactionId = this.transactionId;
- info.Type = (int) TransactionType.Begin;
+ if (txInfo.DistributedIdentifier != Guid.Empty)
+ {
+ xaId.GlobalTransactionId = txInfo.DistributedIdentifier.ToByteArray();
+ xaId.BranchQualifier = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
+ }
+ else
+ {
+ xaId.GlobalTransactionId = Encoding.UTF8.GetBytes(txInfo.LocalIdentifier);
+ xaId.BranchQualifier = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
+ }
+
+ // Now notify the broker that a new XA'ish transaction has started.
+ TransactionInfo info = new TransactionInfo();
+ info.ConnectionId = this.connection.ConnectionId;
+ info.TransactionId = this.transactionId;
+ info.Type = (int) TransactionType.Begin;
- this.session.Connection.Oneway(info);
+ this.session.Connection.Oneway(info);
- if(Tracer.IsDebugEnabled)
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("Began XA'ish Transaction:" + xaId.GlobalTransactionId.ToString());
+ }
+ }
+ catch(Exception)
{
- Tracer.Debug("Began XA'ish Transaction:" + xaId.GlobalTransactionId.ToString());
+ dtcControlEvent.Set();
+ throw;
}
}
@@ -262,8 +272,6 @@ namespace Apache.NMS.ActiveMQ
{
try
{
- dtcControlEvent.Reset();
-
Tracer.Debug("Prepare notification received for TX id: " + this.transactionId);
BeforeEnd();
@@ -302,6 +310,9 @@ namespace Apache.NMS.ActiveMQ
// Done so commit won't be called.
AfterCommit();
+
+ // A Read-Only TX is considered closed at this point, DTC won't call us again.
+ this.dtcControlEvent.Set();
}
else
{
@@ -329,19 +340,14 @@ namespace Apache.NMS.ActiveMQ
this.currentEnlistment = null;
this.transactionId = null;
- }
- finally
- {
this.dtcControlEvent.Set();
- }
+ }
}
public void Commit(Enlistment enlistment)
{
try
{
- dtcControlEvent.Reset();
-
Tracer.Debug("Commit notification received for TX id: " + this.transactionId);
if (this.transactionId != null)
@@ -398,8 +404,6 @@ namespace Apache.NMS.ActiveMQ
{
try
{
- dtcControlEvent.Reset();
-
Tracer.Debug("Single Phase Commit notification received for TX id: " + this.transactionId);
if (this.transactionId != null)
@@ -450,9 +454,7 @@ namespace Apache.NMS.ActiveMQ
public void Rollback(Enlistment enlistment)
{
try
- {
- dtcControlEvent.Reset();
-
+ {
Tracer.Debug("Rollback notification received for TX id: " + this.transactionId);
if (this.transactionId != null)
@@ -515,8 +517,6 @@ namespace Apache.NMS.ActiveMQ
{
try
{
- dtcControlEvent.Reset();
-
Tracer.Debug("In Doubt notification received for TX id: " + this.transactionId);
BeforeEnd();
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs?rev=1090261&r1=1090260&r2=1090261&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs Fri Apr 8 14:19:19 2011
@@ -16,6 +16,7 @@
*/
using System;
+using System.Collections;
using System.Data.SqlClient;
using System.IO;
using System.Threading;
@@ -32,6 +33,68 @@ namespace Apache.NMS.ActiveMQ.Test
class DtcConsumerTransactionsTest : DtcTransactionsTestSupport
{
[Test]
+ public void TestRedelivered()
+ {
+ // enqueue several messages
+ PurgeDatabase();
+ PurgeAndFillQueue();
+
+ // receive just one
+ INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+ using (INetTxConnection connection = factory.CreateNetTxConnection())
+ {
+ connection.Start();
+
+ using (INetTxSession session = connection.CreateNetTxSession())
+ {
+ IQueue queue = session.GetQueue(testQueueName);
+
+ // read message from queue and insert into db table
+ using (IMessageConsumer consumer = session.CreateConsumer(queue))
+ {
+ using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
+ using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+ using (SqlCommand sqlInsertCommand = new SqlCommand())
+ {
+ sqlConnection.Open();
+ sqlInsertCommand.Connection = sqlConnection;
+
+ ITextMessage message = consumer.Receive(TimeSpan.FromMilliseconds(10000)) as ITextMessage;
+ sqlInsertCommand.CommandText =
+ string.Format("INSERT INTO {0} VALUES ({1})", testTable, Convert.ToInt32(message.Text));
+ sqlInsertCommand.ExecuteNonQuery();
+
+ scoped.Complete();
+ }
+ }
+
+ session.Close();
+ }
+ }
+
+ // check that others message have status redelivered = false
+ IConnectionFactory checkFactory = new ConnectionFactory(ReplaceEnvVar(connectionURI));
+
+ using (IConnection connection = checkFactory.CreateConnection())
+ {
+ connection.Start();
+
+ using (ISession session = connection.CreateSession())
+ using (IQueueBrowser browser = session.CreateBrowser(session.GetQueue(testQueueName)))
+ {
+ IEnumerator enumerator = browser.GetEnumerator();
+
+ while (enumerator.MoveNext())
+ {
+ IMessage msg = enumerator.Current as IMessage;
+ Assert.IsNotNull(msg, "message is not in the queue!");
+ Assert.IsFalse(msg.NMSRedelivered, "message is redelivered!");
+ }
+ }
+ }
+ }
+
+ [Test]
public void TestRecoveryAfterCommitFailsBeforeSent()
{
// Test initialize - Fills in queue with data to send and clears the DB.
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NetTxTransactionTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NetTxTransactionTest.cs?rev=1090261&r1=1090260&r2=1090261&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NetTxTransactionTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NetTxTransactionTest.cs Fri Apr 8 14:19:19 2011
@@ -21,8 +21,6 @@ using System.Transactions;
using NUnit.Framework;
using Apache.NMS.Test;
-using Apache.NMS.Util;
-using Apache.NMS.ActiveMQ;
namespace Apache.NMS.ActiveMQ.Test
{
@@ -236,10 +234,10 @@ namespace Apache.NMS.ActiveMQ.Test
producer.Send(session.CreateTextMessage("Hello World"));
}
- session.Close();
-
scoped.Complete();
}
+
+ session.Close();
}
}
@@ -255,10 +253,10 @@ namespace Apache.NMS.ActiveMQ.Test
Assert.IsNotNull(msg, "Message was null for index: " + i);
}
- session.Close();
-
scoped.Complete();
}
+
+ session.Close();
}
}