You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Remo Gloor (JIRA)" <ji...@apache.org> on 2013/02/14 18:45:13 UTC

[jira] [Updated] (AMQNET-412) Messages are enlisted to the wrong transaction

     [ https://issues.apache.org/jira/browse/AMQNET-412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Remo Gloor updated AMQNET-412:
------------------------------

    Description: 
Under load active mq enlists a message to a previous transactions. This leads to very strange behaviors:
- Database is updated and message is rolled back
- Message is completed but database rolledback

All this results in an invalid system state making. DTC is not usable this way.

Analysis of the source code have shown that the problem is in NetTxSession.DoStartTransaction There it checks if a .NET Transaction in the TransactionContext. In this case it adds the message to that transaction. But this can be the previous transaction because DTC 2-PhaseCommit is asyncronous. It needs to check if the Current Transaction is the same as one before and wait if they do not match.

The following applacation demonstrates the problem when enough messages are processed E.g. enqueue 100 msg in foo.bar. It is basically TestRedeliveredCase3 but with half of the messages failing. 

Whenever a SinglePhaseCommit occurs in the TestSinglePhaseCommit this means the database would be commited in an own transaction. 

    class Program
    {
        private static INetTxSession activeMqSession;
        private static IMessageConsumer consumer;
        private static INetTxConnection connection;

        static void Main(string[] args)
        {
            using (connection = CreateActiveMqConnection())
            using (activeMqSession = connection.CreateNetTxSession())
            using (consumer = activeMqSession.CreateConsumer(SessionUtil.GetQueue(activeMqSession, "queue://foo.bar")))
            {
                connection.Start();

                while (true)
                {
                    try
                    {
                        using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
                        {
                            IMessage msg = null;
                            while (msg == null)
                            {
                                msg = consumer.ReceiveNoWait();
                            }

                            OnMessage(msg);
                            scoped.Complete();
                        }
                    }
                    catch(Exception exception) {}
                }
            }
        }

        private static INetTxConnection CreateActiveMqConnection()
        {
            var connectionFactory = new Apache.NMS.ActiveMQ.NetTxConnectionFactory("activemq:tcp://localhost:61616")
            {
                AcknowledgementMode = AcknowledgementMode.Transactional
            };

            return connectionFactory.CreateNetTxConnection();
        }

        private static void OnMessage(IMessage message)
        {
            var x = new TestSinglePhaseCommit();

            var session2 = activeMqSession;
            {
                Transaction.Current.EnlistDurable(Guid.NewGuid(), x, EnlistmentOptions.None);

                using (var producer = session2.CreateProducer(SessionUtil.GetQueue(session2, "queue://foo.baz")))
                {
                    producer.Send(new ActiveMQTextMessage("foo"));
                }

                if (new Random().Next(2) == 0) throw new Exception();
            }
        }
    }

    internal class TestSinglePhaseCommit : ISinglePhaseNotification
    {
        public void Prepare(PreparingEnlistment preparingEnlistment)
        {
            Console.WriteLine("Tx Prepare");
            preparingEnlistment.Prepared();
        }
        public void Commit(Enlistment enlistment)
        {
            Console.WriteLine("Tx Commit");
            enlistment.Done();
        }

        public void Rollback(Enlistment enlistment)
        {
            Console.WriteLine("Tx Rollback");
            enlistment.Done();
        }
        public void InDoubt(Enlistment enlistment)
        {
            Console.WriteLine("Tx InDoubt");
            enlistment.Done();
        }
        public void SinglePhaseCommit(SinglePhaseEnlistment singlePhaseEnlistment)
        {
            Console.WriteLine("Tx SinglePhaseCommit");
            singlePhaseEnlistment.Committed();
        }
    }



  was:
Under load active mq enlists a message to a previous transactions. This leads to very strange behaviors:
- Database is updated and message is rolled back
- Message is completed but database rolledback

All this results in an invalid system state making. DTC is not usable this way.

Analysis of the source code have shown that the problem is in NetTxSession.DoStartTransaction There it checks if a .NET Transaction in the TransactionContext. In this case it adds the message to that transaction. But this can be the previous transaction because DTC 2-PhaseCommit is asyncronous. It needs to check if the Current Transaction is the same as one before and wait if they do not match.

The following applacation demonstrates the problem when enough messages are processed E.g. enqueue 100 msg in foo.bar. It is basically TestRedeliveredCase3 but with half of the messages failing. 

Whenever a SinglePhaseCommit occurs in the TestSinglePhaseCommit this means the database would be commited in an own transaction. 

    class Program
    {
        private static INetTxSession activeMqSession;
        private static IMessageConsumer consumer;
        private static INetTxConnection connection;

        static void Main(string[] args)
        {
            using (connection = CreateActiveMqConnection())
            using (activeMqSession = connection.CreateNetTxSession())
            using (consumer = activeMqSession.CreateConsumer(SessionUtil.GetQueue(activeMqSession, "queue://foo.bar")))
            {
                connection.Start();

                while (true)
                {
                    try
                    {
                        using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
                        {
                            IMessage msg = null;
                            while (msg == null)
                            {
                                msg = consumer.ReceiveNoWait();
                            }

                            OnMessage(msg);
                            scoped.Complete();
                        }
                    }
                    catch(Exception exception) {}
                }
            }
        }

        private static INetTxConnection CreateActiveMqConnection()
        {
            var connectionFactory = new Apache.NMS.ActiveMQ.NetTxConnectionFactory("activemq:tcp://localhost:61616")
            {
                AcknowledgementMode = AcknowledgementMode.Transactional
            };

            return connectionFactory.CreateNetTxConnection();
        }

        private static void OnMessage(IMessage message)
        {
            var x = new TestSinglePhaseCommit();

            var session2 = activeMqSession;
            {
                Transaction.Current.EnlistDurable(Guid.NewGuid(), x, EnlistmentOptions.None);

                // The proble occurs only if a message is sent using the same session like the receiver
                using (var producer = session2.CreateProducer(SessionUtil.GetQueue(session2, "queue://foo.baz")))
                {
                    producer.Send(new ActiveMQTextMessage("foo"));
                }

                if (new Random().Next(2) == 0) throw new Exception();
            }
        }
    }

    internal class TestSinglePhaseCommit : ISinglePhaseNotification
    {
        public void Prepare(PreparingEnlistment preparingEnlistment)
        {
            Console.WriteLine("Tx Prepare");
            preparingEnlistment.Prepared();
        }
        public void Commit(Enlistment enlistment)
        {
            Console.WriteLine("Tx Commit");
            enlistment.Done();
        }

        public void Rollback(Enlistment enlistment)
        {
            Console.WriteLine("Tx Rollback");
            enlistment.Done();
        }
        public void InDoubt(Enlistment enlistment)
        {
            Console.WriteLine("Tx InDoubt");
            enlistment.Done();
        }
        public void SinglePhaseCommit(SinglePhaseEnlistment singlePhaseEnlistment)
        {
            Console.WriteLine("Tx SinglePhaseCommit");
            singlePhaseEnlistment.Committed();
        }
    }



    
> Messages are enlisted to the wrong transaction
> ----------------------------------------------
>
>                 Key: AMQNET-412
>                 URL: https://issues.apache.org/jira/browse/AMQNET-412
>             Project: ActiveMQ .Net
>          Issue Type: Bug
>         Environment: Apache.NMS.ActiveMq 1.5.7
>            Reporter: Remo Gloor
>            Assignee: Jim Gomes
>            Priority: Critical
>
> Under load active mq enlists a message to a previous transactions. This leads to very strange behaviors:
> - Database is updated and message is rolled back
> - Message is completed but database rolledback
> All this results in an invalid system state making. DTC is not usable this way.
> Analysis of the source code have shown that the problem is in NetTxSession.DoStartTransaction There it checks if a .NET Transaction in the TransactionContext. In this case it adds the message to that transaction. But this can be the previous transaction because DTC 2-PhaseCommit is asyncronous. It needs to check if the Current Transaction is the same as one before and wait if they do not match.
> The following applacation demonstrates the problem when enough messages are processed E.g. enqueue 100 msg in foo.bar. It is basically TestRedeliveredCase3 but with half of the messages failing. 
> Whenever a SinglePhaseCommit occurs in the TestSinglePhaseCommit this means the database would be commited in an own transaction. 
>     class Program
>     {
>         private static INetTxSession activeMqSession;
>         private static IMessageConsumer consumer;
>         private static INetTxConnection connection;
>         static void Main(string[] args)
>         {
>             using (connection = CreateActiveMqConnection())
>             using (activeMqSession = connection.CreateNetTxSession())
>             using (consumer = activeMqSession.CreateConsumer(SessionUtil.GetQueue(activeMqSession, "queue://foo.bar")))
>             {
>                 connection.Start();
>                 while (true)
>                 {
>                     try
>                     {
>                         using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
>                         {
>                             IMessage msg = null;
>                             while (msg == null)
>                             {
>                                 msg = consumer.ReceiveNoWait();
>                             }
>                             OnMessage(msg);
>                             scoped.Complete();
>                         }
>                     }
>                     catch(Exception exception) {}
>                 }
>             }
>         }
>         private static INetTxConnection CreateActiveMqConnection()
>         {
>             var connectionFactory = new Apache.NMS.ActiveMQ.NetTxConnectionFactory("activemq:tcp://localhost:61616")
>             {
>                 AcknowledgementMode = AcknowledgementMode.Transactional
>             };
>             return connectionFactory.CreateNetTxConnection();
>         }
>         private static void OnMessage(IMessage message)
>         {
>             var x = new TestSinglePhaseCommit();
>             var session2 = activeMqSession;
>             {
>                 Transaction.Current.EnlistDurable(Guid.NewGuid(), x, EnlistmentOptions.None);
>                 using (var producer = session2.CreateProducer(SessionUtil.GetQueue(session2, "queue://foo.baz")))
>                 {
>                     producer.Send(new ActiveMQTextMessage("foo"));
>                 }
>                 if (new Random().Next(2) == 0) throw new Exception();
>             }
>         }
>     }
>     internal class TestSinglePhaseCommit : ISinglePhaseNotification
>     {
>         public void Prepare(PreparingEnlistment preparingEnlistment)
>         {
>             Console.WriteLine("Tx Prepare");
>             preparingEnlistment.Prepared();
>         }
>         public void Commit(Enlistment enlistment)
>         {
>             Console.WriteLine("Tx Commit");
>             enlistment.Done();
>         }
>         public void Rollback(Enlistment enlistment)
>         {
>             Console.WriteLine("Tx Rollback");
>             enlistment.Done();
>         }
>         public void InDoubt(Enlistment enlistment)
>         {
>             Console.WriteLine("Tx InDoubt");
>             enlistment.Done();
>         }
>         public void SinglePhaseCommit(SinglePhaseEnlistment singlePhaseEnlistment)
>         {
>             Console.WriteLine("Tx SinglePhaseCommit");
>             singlePhaseEnlistment.Committed();
>         }
>     }

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira