You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Remo Gloor (JIRA)" <ji...@apache.org> on 2013/03/11 08:45:22 UTC

[jira] [Updated] (AMQNET-412) Messages are enlisted to the wrong transaction

     [ https://issues.apache.org/jira/browse/AMQNET-412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Remo Gloor updated AMQNET-412:
------------------------------

    Attachment: allDTCImprovments.patch
    
> Messages are enlisted to the wrong transaction
> ----------------------------------------------
>
>                 Key: AMQNET-412
>                 URL: https://issues.apache.org/jira/browse/AMQNET-412
>             Project: ActiveMQ .Net
>          Issue Type: Bug
>         Environment: Apache.NMS.ActiveMq 1.5.7
>            Reporter: Remo Gloor
>            Assignee: Jim Gomes
>            Priority: Critical
>         Attachments: allDTCImprovments.patch, MessagesAreEnlistedToTheWrongTransaction.patch
>
>
> Under load active mq enlists a message to a previous transactions. This leads to very strange behaviors:
> - Database is updated and message is rolled back
> - Message is completed but database rolledback
> All this results in an invalid system state making. DTC is not usable this way.
> Analysis of the source code have shown that the problem is in NetTxSession.DoStartTransaction There it checks if a .NET Transaction in the TransactionContext. In this case it adds the message to that transaction. But this can be the previous transaction because DTC 2-PhaseCommit is asyncronous. It needs to check if the Current Transaction is the same as one before and wait if they do not match.
> The following applacation demonstrates the problem when enough messages are processed E.g. enqueue 100 msg in foo.bar. It is basically TestRedeliveredCase3 but with half of the messages failing. 
> Whenever a SinglePhaseCommit occurs in the TestSinglePhaseCommit this means the database would be commited in an own transaction. 
>     class Program
>     {
>         private static INetTxSession activeMqSession;
>         private static IMessageConsumer consumer;
>         private static INetTxConnection connection;
>         static void Main(string[] args)
>         {
>             using (connection = CreateActiveMqConnection())
>             using (activeMqSession = connection.CreateNetTxSession())
>             using (consumer = activeMqSession.CreateConsumer(SessionUtil.GetQueue(activeMqSession, "queue://foo.bar")))
>             {
>                 connection.Start();
>                 while (true)
>                 {
>                     try
>                     {
>                         using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
>                         {
>                             IMessage msg = null;
>                             while (msg == null)
>                             {
>                                 msg = consumer.ReceiveNoWait();
>                             }
>                             OnMessage(msg);
>                             scoped.Complete();
>                         }
>                     }
>                     catch(Exception exception) {}
>                 }
>             }
>         }
>         private static INetTxConnection CreateActiveMqConnection()
>         {
>             var connectionFactory = new Apache.NMS.ActiveMQ.NetTxConnectionFactory("activemq:tcp://localhost:61616")
>             {
>                 AcknowledgementMode = AcknowledgementMode.Transactional
>             };
>             return connectionFactory.CreateNetTxConnection();
>         }
>         private static void OnMessage(IMessage message)
>         {
>             var x = new TestSinglePhaseCommit();
>             var session2 = activeMqSession;
>             {
>                 Transaction.Current.EnlistDurable(Guid.NewGuid(), x, EnlistmentOptions.None);
>                 using (var producer = session2.CreateProducer(SessionUtil.GetQueue(session2, "queue://foo.baz")))
>                 {
>                     producer.Send(new ActiveMQTextMessage("foo"));
>                 }
>                 if (new Random().Next(2) == 0) throw new Exception();
>             }
>         }
>     }
>     internal class TestSinglePhaseCommit : ISinglePhaseNotification
>     {
>         public void Prepare(PreparingEnlistment preparingEnlistment)
>         {
>             Console.WriteLine("Tx Prepare");
>             preparingEnlistment.Prepared();
>         }
>         public void Commit(Enlistment enlistment)
>         {
>             Console.WriteLine("Tx Commit");
>             enlistment.Done();
>         }
>         public void Rollback(Enlistment enlistment)
>         {
>             Console.WriteLine("Tx Rollback");
>             enlistment.Done();
>         }
>         public void InDoubt(Enlistment enlistment)
>         {
>             Console.WriteLine("Tx InDoubt");
>             enlistment.Done();
>         }
>         public void SinglePhaseCommit(SinglePhaseEnlistment singlePhaseEnlistment)
>         {
>             Console.WriteLine("Tx SinglePhaseCommit");
>             singlePhaseEnlistment.Committed();
>         }
>     }

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira