You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/04/08 16:19:20 UTC

svn commit: r1090261 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/MessageConsumer.cs main/csharp/Session.cs main/csharp/TransactionContext.cs test/csharp/DtcConsumerTransactionsTest.cs test/csharp/NetTxTransactionTest.cs

Author: tabish
Date: Fri Apr  8 14:19:19 2011
New Revision: 1090261

URL: http://svn.apache.org/viewvc?rev=1090261&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQNET-326

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NetTxTransactionTest.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1090261&r1=1090260&r2=1090261&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Fri Apr  8 14:19:19 2011
@@ -42,7 +42,7 @@ namespace Apache.NMS.ActiveMQ
 		private readonly MessageDispatchChannel unconsumedMessages;
 		private readonly LinkedList<MessageDispatch> dispatchedMessages = new LinkedList<MessageDispatch>();
 		private readonly ConsumerInfo info;
-		private Session session;
+		private readonly Session session;
 
 		private MessageAck pendingAck = null;
 
@@ -62,9 +62,6 @@ namespace Apache.NMS.ActiveMQ
 
         private Exception failureError;
 
-		private const int DEFAULT_REDELIVERY_DELAY = 0;
-		private const int DEFAULT_MAX_REDELIVERIES = 5;
-
 		private event MessageListener listener;
 
 		private IRedeliveryPolicy redeliveryPolicy;
@@ -1227,12 +1224,16 @@ namespace Apache.NMS.ActiveMQ
 
 			public void AfterCommit()
 			{
-				this.consumer.DoClose();
+                Tracer.DebugFormat("ConsumerCloseSynchronization - AfterCommit Called for Consumer {0}.",
+                                   this.consumer.ConsumerId);
+                this.consumer.DoClose();
 			}
 
 			public void AfterRollback()
 			{
-				this.consumer.DoClose();
+                Tracer.DebugFormat("ConsumerCloseSynchronization - AfterRollback Called for Consumer {0}.",
+                                   this.consumer.ConsumerId);
+                this.consumer.DoClose();
 			}
 		}
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=1090261&r1=1090260&r2=1090261&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Fri Apr  8 14:19:19 2011
@@ -41,7 +41,7 @@ namespace Apache.NMS.ActiveMQ
         private readonly SessionExecutor executor;
         private readonly TransactionContext transactionContext;
 
-        private Connection connection;
+        private readonly Connection connection;
 
         private bool dispatchAsync;
         private bool exclusive;
@@ -293,31 +293,29 @@ namespace Apache.NMS.ActiveMQ
 
         public void Close()
         {
-            lock(myLock)
+            if(this.closed)
             {
-                if(this.closed)
-                {
-                    return;
-                }
+                return;
+            }
 
-                try
+            try
+            {
+                if(transactionContext.InNetTransaction)
                 {
-                    if(transactionContext.InNetTransaction)
-                    {
-                        this.transactionContext.AddSynchronization(new SessionCloseSynchronization(this));
-                    }
-                    else
-                    {
-                        Tracer.InfoFormat("Closing The Session with Id {0}", this.info.SessionId.ToString());
-                        DoClose();
-                        Tracer.InfoFormat("Closed The Session with Id {0}", this.info.SessionId.ToString());
-                    }
+                    this.transactionContext.AddSynchronization(new SessionCloseSynchronization(this));
+                    this.transactionContext.DtcWaitHandle.WaitOne();
                 }
-                catch(Exception ex)
+                else
                 {
-                    Tracer.ErrorFormat("Error during session close: {0}", ex);
+                    Tracer.InfoFormat("Closing The Session with Id {0}", this.info.SessionId);
+                    DoClose();
+                    Tracer.InfoFormat("Closed The Session with Id {0}", this.info.SessionId);
                 }
             }
+            catch(Exception ex)
+            {
+                Tracer.ErrorFormat("Error during session close: {0}", ex);
+            }
         }
 
         internal void DoClose()
@@ -331,9 +329,16 @@ namespace Apache.NMS.ActiveMQ
 		
         internal void Shutdown()
         {
+            Tracer.InfoFormat("Executing Shutdown on Session with Id {0}", this.info.SessionId);
+
+            if(this.closed)
+            {
+                return;
+            }
+
             lock(myLock)
             {
-                if(this.closed)
+                if(this.closed || this.closing)
                 {
                     return;
                 }
@@ -366,8 +371,8 @@ namespace Apache.NMS.ActiveMQ
                     }
                     producers.Clear();
 
-                    // If in a transaction roll it back
-                    if(this.IsTransacted && this.transactionContext.InLocalTransaction)
+                    // If in a local transaction we just roll back at this point.
+                    if (this.IsTransacted && this.transactionContext.InLocalTransaction)
                     {
                         try
                         {
@@ -954,12 +959,14 @@ namespace Apache.NMS.ActiveMQ
 
             public void AfterCommit()
             {
-                this.session.DoClose();
+                Tracer.Debug("SessionCloseSynchronization AfterCommit called for Session: " + session.SessionId);
+                session.DoClose();
             }
 
             public void AfterRollback()
             {
-                this.session.DoClose();
+                Tracer.Debug("SessionCloseSynchronization AfterRollback called for Session: " + session.SessionId);
+                session.DoClose();
             }
         }
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs?rev=1090261&r1=1090260&r2=1090261&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs Fri Apr  8 14:19:19 2011
@@ -220,41 +220,51 @@ namespace Apache.NMS.ActiveMQ
                 throw new TransactionInProgressException("A Transaction is already in Progress");
             }
 
-            Guid rmId = ResourceManagerGuid;
+            dtcControlEvent.Reset();
 
-            // Enlist this object in the transaction.
-            this.currentEnlistment =
-                transaction.EnlistDurable(rmId, this, EnlistmentOptions.None);
+            try
+            {
+                Guid rmId = ResourceManagerGuid;
 
-            Tracer.Debug("Enlisted in Durable Transaction with RM Id: " + rmId);
+                // Enlist this object in the transaction.
+                this.currentEnlistment =
+                    transaction.EnlistDurable(rmId, this, EnlistmentOptions.None);
 
-            TransactionInformation txInfo = transaction.TransactionInformation;
+                Tracer.Debug("Enlisted in Durable Transaction with RM Id: " + rmId);
 
-            XATransactionId xaId = new XATransactionId();
-            this.transactionId = xaId;
+                TransactionInformation txInfo = transaction.TransactionInformation;
 
-            if(txInfo.DistributedIdentifier != Guid.Empty)
-            {
-                xaId.GlobalTransactionId = txInfo.DistributedIdentifier.ToByteArray();
-                xaId.BranchQualifier = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
-            }
-            else
-            {
-                xaId.GlobalTransactionId = Encoding.UTF8.GetBytes(txInfo.LocalIdentifier);
-                xaId.BranchQualifier = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
-            }
+                XATransactionId xaId = new XATransactionId();
+                this.transactionId = xaId;
 
-            // Now notify the broker that a new XA'ish transaction has started.
-            TransactionInfo info = new TransactionInfo();
-            info.ConnectionId = this.connection.ConnectionId;
-            info.TransactionId = this.transactionId;
-            info.Type = (int) TransactionType.Begin;
+                if (txInfo.DistributedIdentifier != Guid.Empty)
+                {
+                    xaId.GlobalTransactionId = txInfo.DistributedIdentifier.ToByteArray();
+                    xaId.BranchQualifier = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
+                }
+                else
+                {
+                    xaId.GlobalTransactionId = Encoding.UTF8.GetBytes(txInfo.LocalIdentifier);
+                    xaId.BranchQualifier = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
+                }
+
+                // Now notify the broker that a new XA'ish transaction has started.
+                TransactionInfo info = new TransactionInfo();
+                info.ConnectionId = this.connection.ConnectionId;
+                info.TransactionId = this.transactionId;
+                info.Type = (int) TransactionType.Begin;
 
-            this.session.Connection.Oneway(info);
+                this.session.Connection.Oneway(info);
 
-            if(Tracer.IsDebugEnabled)
+                if (Tracer.IsDebugEnabled)
+                {
+                    Tracer.Debug("Began XA'ish Transaction:" + xaId.GlobalTransactionId.ToString());
+                }
+            }
+            catch(Exception)
             {
-                Tracer.Debug("Began XA'ish Transaction:" + xaId.GlobalTransactionId.ToString());
+                dtcControlEvent.Set();
+                throw;
             }
         }
 
@@ -262,8 +272,6 @@ namespace Apache.NMS.ActiveMQ
         {
             try
             {
-                dtcControlEvent.Reset();
-
                 Tracer.Debug("Prepare notification received for TX id: " + this.transactionId);
 				
                 BeforeEnd();
@@ -302,6 +310,9 @@ namespace Apache.NMS.ActiveMQ
 
                     // Done so commit won't be called.
                     AfterCommit();
+
+                    // A Read-Only TX is considered closed at this point, DTC won't call us again.
+                    this.dtcControlEvent.Set();
                 }
                 else
                 {
@@ -329,19 +340,14 @@ namespace Apache.NMS.ActiveMQ
 
                 this.currentEnlistment = null;
                 this.transactionId = null;
-            }
-            finally
-            {
                 this.dtcControlEvent.Set();
-            }   
+            }
         }
 
         public void Commit(Enlistment enlistment)
         {
             try
             {
-                dtcControlEvent.Reset();
-
                 Tracer.Debug("Commit notification received for TX id: " + this.transactionId);
 
                 if (this.transactionId != null)
@@ -398,8 +404,6 @@ namespace Apache.NMS.ActiveMQ
         {
             try
             {
-                dtcControlEvent.Reset();
-
                 Tracer.Debug("Single Phase Commit notification received for TX id: " + this.transactionId);
 
                 if (this.transactionId != null)
@@ -450,9 +454,7 @@ namespace Apache.NMS.ActiveMQ
         public void Rollback(Enlistment enlistment)
         {
             try
-            {
-                dtcControlEvent.Reset();
-                
+            {                
                 Tracer.Debug("Rollback notification received for TX id: " + this.transactionId);
 
                 if (this.transactionId != null)
@@ -515,8 +517,6 @@ namespace Apache.NMS.ActiveMQ
         {
             try
             {
-                dtcControlEvent.Reset();
-                
                 Tracer.Debug("In Doubt notification received for TX id: " + this.transactionId);
 				
                 BeforeEnd();

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs?rev=1090261&r1=1090260&r2=1090261&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs Fri Apr  8 14:19:19 2011
@@ -16,6 +16,7 @@
  */
 
 using System;
+using System.Collections;
 using System.Data.SqlClient;
 using System.IO;
 using System.Threading;
@@ -32,6 +33,68 @@ namespace Apache.NMS.ActiveMQ.Test
     class DtcConsumerTransactionsTest : DtcTransactionsTestSupport
     {
         [Test]
+        public void TestRedelivered()
+        {
+            // enqueue several messages
+            PurgeDatabase();
+            PurgeAndFillQueue();
+
+            // receive just one
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.Start();
+
+                using (INetTxSession session = connection.CreateNetTxSession())
+                {
+                    IQueue queue = session.GetQueue(testQueueName);
+
+                    // read message from queue and insert into db table
+                    using (IMessageConsumer consumer = session.CreateConsumer(queue))
+                    {
+                        using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
+                        using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+                        using (SqlCommand sqlInsertCommand = new SqlCommand())
+                        {
+                            sqlConnection.Open();
+                            sqlInsertCommand.Connection = sqlConnection;
+
+                            ITextMessage message = consumer.Receive(TimeSpan.FromMilliseconds(10000)) as ITextMessage;
+                            sqlInsertCommand.CommandText =
+                                string.Format("INSERT INTO {0} VALUES ({1})", testTable, Convert.ToInt32(message.Text));
+                            sqlInsertCommand.ExecuteNonQuery();
+
+                            scoped.Complete();
+                        }
+                    }
+
+                    session.Close();
+                }
+            }
+
+            // check that others message have status redelivered = false
+            IConnectionFactory checkFactory = new ConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (IConnection connection = checkFactory.CreateConnection())
+            {
+                connection.Start();
+
+                using (ISession session = connection.CreateSession())
+                using (IQueueBrowser browser = session.CreateBrowser(session.GetQueue(testQueueName)))
+                {
+                    IEnumerator enumerator = browser.GetEnumerator();
+
+                    while (enumerator.MoveNext())
+                    {
+                        IMessage msg = enumerator.Current as IMessage;
+                        Assert.IsNotNull(msg, "message is not in the queue!");
+                        Assert.IsFalse(msg.NMSRedelivered, "message is redelivered!");
+                    }
+                }
+            }
+        }
+
+        [Test]
         public void TestRecoveryAfterCommitFailsBeforeSent()
         {
             // Test initialize - Fills in queue with data to send and clears the DB.

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NetTxTransactionTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NetTxTransactionTest.cs?rev=1090261&r1=1090260&r2=1090261&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NetTxTransactionTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NetTxTransactionTest.cs Fri Apr  8 14:19:19 2011
@@ -21,8 +21,6 @@ using System.Transactions;
 
 using NUnit.Framework;
 using Apache.NMS.Test;
-using Apache.NMS.Util;
-using Apache.NMS.ActiveMQ;
 
 namespace Apache.NMS.ActiveMQ.Test
 {
@@ -236,10 +234,10 @@ namespace Apache.NMS.ActiveMQ.Test
                                 producer.Send(session.CreateTextMessage("Hello World"));
                             }
 
-                            session.Close();
-
                             scoped.Complete();
                         }
+
+                        session.Close();
                     }
                 }
 
@@ -255,10 +253,10 @@ namespace Apache.NMS.ActiveMQ.Test
                                 Assert.IsNotNull(msg, "Message was null for index: " + i);
                             }
 
-                            session.Close();
-
                             scoped.Complete();
                         }
+
+                        session.Close();
                     }
                 }