You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/04/26 23:27:32 UTC

svn commit: r1096916 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/MessageConsumer.cs main/csharp/NetTxSession.cs main/csharp/Session.cs main/csharp/TransactionContext.cs test/csharp/DtcConsumerTransactionsTest.cs

Author: tabish
Date: Tue Apr 26 21:27:32 2011
New Revision: 1096916

URL: http://svn.apache.org/viewvc?rev=1096916&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQNET-326

Fixes some more edge cases on TX rollback and message consumption.

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1096916&r1=1096915&r2=1096916&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Tue Apr 26 21:27:32 2011
@@ -330,11 +330,15 @@ namespace Apache.NMS.ActiveMQ
 			{
 				if(this.session.IsTransacted && this.session.TransactionContext.InTransaction)
 				{
-					this.session.TransactionContext.AddSynchronization(new ConsumerCloseSynchronization(this));
+                    Tracer.DebugFormat("Consumer {0} Registering new ConsumerCloseSynchronization",
+                                       this.info.ConsumerId);
+                    this.session.TransactionContext.AddSynchronization(new ConsumerCloseSynchronization(this));
 				}
 				else
 				{
-					this.DoClose();
+                    Tracer.DebugFormat("Consumer {0} No Active TX closing normally.",
+                                       this.info.ConsumerId);
+                    this.DoClose();
 				}
 			}
 		}
@@ -765,12 +769,18 @@ namespace Apache.NMS.ActiveMQ
 			{
                 if (this.session.IsTransacted)
                 {
-                    // In the case where the consumer is operating in concert with 
-                    // a distributed TX manager we need to wait whenever the TX
-                    // is controlled by the DTC as it completes all operations
-                    // async and we cannot start consumption again until all its
-                    // tasks have completed.
-                    this.session.TransactionContext.DtcWaitHandle.WaitOne();
+                    this.session.TransactionContext.SyncRoot.WaitOne();
+
+                    // In the case where the consumer is operating in concert with a
+                    // distributed TX manager we need to wait whenever the TX is being
+                    // controlled by the DTC as it completes all operations async and
+                    // we cannot start consumption again until all its tasks have completed.)
+                    if (this.session.TransactionContext.InNetTransaction && 
+                        this.session.TransactionContext.NetTxState == TransactionContext.TxState.Pending)
+                    {
+                        this.session.TransactionContext.SyncRoot.ReleaseMutex();
+                        this.session.TransactionContext.DtcWaitHandle.WaitOne();                        
+                    }
                 }
 
 			    lock(this.dispatchedMessages)
@@ -780,7 +790,7 @@ namespace Apache.NMS.ActiveMQ
 
 				if(this.session.IsTransacted)
 				{
-				    this.session.TransactionContext.DtcWaitHandle.WaitOne();
+				    //this.session.TransactionContext.DtcWaitHandle.WaitOne();
 					this.AckLater(dispatch, AckType.DeliveredAck);
 				}
 			}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs?rev=1096916&r1=1096915&r2=1096916&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs Tue Apr 26 21:27:32 2011
@@ -92,18 +92,14 @@ namespace Apache.NMS.ActiveMQ
             if (!TransactionContext.InNetTransaction && Transaction.Current != null)
             {
                 Tracer.Debug("NetTxSession detected Ambient Transaction, start new TX with broker");
-
                 EnrollInSpecifiedTransaction(Transaction.Current);
             }
+
+            TransactionContext.SyncRoot.ReleaseMutex();
         }
 
         private void EnrollInSpecifiedTransaction(Transaction tx)
         {
-            // If an Async DTC operation is in progress such as Commit or Rollback
-            // we need to let it complete before deciding if the Session is in a TX
-            // otherwise we might error out for no reason.
-            //TransactionContext.DtcWaitHandle.WaitOne();
-
             if(TransactionContext.InNetTransaction)
             {
                 Tracer.Warn("Enlist attempted while a Net TX was Active.");

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=1096916&r1=1096915&r2=1096916&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Tue Apr 26 21:27:32 2011
@@ -306,8 +306,11 @@ namespace Apache.NMS.ActiveMQ
                 {
                     TransactionContext.SyncRoot.ReleaseMutex();
 
-                    //this.transactionContext.AddSynchronization(new SessionCloseSynchronization(this));)
+                    // Must wait for all the DTC operations to complete before
+                    // moving on from this close call.
                     this.transactionContext.DtcWaitHandle.WaitOne();
+
+                    TransactionContext.SyncRoot.WaitOne();
                 }
 
                 TransactionContext.SyncRoot.ReleaseMutex();

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs?rev=1096916&r1=1096915&r2=1096916&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs Tue Apr 26 21:27:32 2011
@@ -243,6 +243,7 @@ namespace Apache.NMS.ActiveMQ
             lock (syncObject)
             {
                 this.netTxState = TxState.Active;
+                dtcControlEvent.Reset();
 
                 Tracer.Debug("Begin notification received");
 
@@ -306,8 +307,6 @@ namespace Apache.NMS.ActiveMQ
 
                 try
                 {
-                    dtcControlEvent.Reset();
-
                     Tracer.Debug("Prepare notification received for TX id: " + this.transactionId);
 
                     BeforeEnd();

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs?rev=1096916&r1=1096915&r2=1096916&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs Tue Apr 26 21:27:32 2011
@@ -23,7 +23,6 @@ using System.Threading;
 using System.Transactions;
 using Apache.NMS.ActiveMQ.Transport;
 using Apache.NMS.ActiveMQ.Transport.Tcp;
-using Apache.NMS.Util;
 using NUnit.Framework;
 
 namespace Apache.NMS.ActiveMQ.Test
@@ -238,6 +237,77 @@ namespace Apache.NMS.ActiveMQ.Test
         }
 
         [Test]
+        public void TestRedeliveredNoComplete()
+        {
+            const int messageCount = 300;
+            const int receiveCount = 150;
+
+            // enqueue several messages
+            PurgeDatabase();
+            PurgeAndFillQueue(messageCount);
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                // allow no redelivery so that message immediatly goes to the DLQ if first read fails
+                connection.RedeliveryPolicy.MaximumRedeliveries = 0;
+                connection.Start();
+
+                // receive half of total messages
+                using (INetTxSession session = connection.CreateNetTxSession())
+                {
+                    IQueue queue = session.GetQueue(testQueueName);
+
+                    // read message from queue and insert into db table
+                    using (IMessageConsumer consumer = session.CreateConsumer(queue))
+                    {
+                        for (int i = 0; i < receiveCount; i++)
+                        {
+                            using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
+                            using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+                            using (SqlCommand sqlInsertCommand = new SqlCommand())
+                            {
+                                sqlConnection.Open();
+                                sqlInsertCommand.Connection = sqlConnection;
+
+                                ITextMessage message =
+                                    consumer.Receive(TimeSpan.FromMilliseconds(10000)) as ITextMessage;
+                                sqlInsertCommand.CommandText =
+                                    string.Format("INSERT INTO {0} VALUES ({1})", testTable,
+                                                  Convert.ToInt32(message.Text));
+                                sqlInsertCommand.ExecuteNonQuery();
+                            }
+                        }
+                    }
+
+                    session.Close();
+                }
+            }
+
+            Tracer.Debug("First stage ok");
+
+            // check that others message have status redelivered = false
+            IConnectionFactory checkFactory = new ConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (IConnection connection = checkFactory.CreateConnection())
+            {
+                connection.Start();
+
+                using (ISession session = connection.CreateSession())
+                using (IQueueBrowser browser = session.CreateBrowser(session.GetQueue(testQueueName)))
+                {
+                    IEnumerator enumerator = browser.GetEnumerator();
+                    while (enumerator.MoveNext())
+                    {
+                        IMessage msg = enumerator.Current as IMessage;
+                        Assert.IsNotNull(msg, "message is not in the queue!");
+                        Assert.IsFalse(msg.NMSRedelivered, "message is redelivered!");
+                    }
+                }
+            }
+        }
+
+        [Test]
         public void TestRecoveryAfterCommitFailsBeforeSent()
         {
             // Test initialize - Fills in queue with data to send and clears the DB.