You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Imran (JIRA)" <ji...@apache.org> on 2014/03/03 11:54:25 UTC

[jira] [Updated] (AMQNET-472) Synchronous DTC Consumer will experience duplicates on transaction rollback

     [ https://issues.apache.org/jira/browse/AMQNET-472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Imran updated AMQNET-472:
-------------------------

    Description: 
Rollback when using DTC will result in a duplicate message being received.

{code:title=FailingTest|borderStyle=solid} 
using System;
using System.Configuration;
using System.ServiceProcess;
using System.Transactions;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Apache.NMS.Policies;
using Apache.NMS.Util;
using Common.Logging;
using Common.Logging.Simple;
using NUnit.Framework;

namespace IntegrationTests.ApacheNms.Jira
{
    [TestFixture]
    public class Dtc
    {
        [Test, Explicit("Bug in 1.6.2")]
        public void First_message_should_be_redilivered_and_republished_on_rollback_and_second_message_processed_as_normal()
        {
            SendMessageToQueue("1");
            SendMessageToQueue("2");
            var session = _connection.CreateSession();
            var sessionTwo = _connection.CreateSession();
            var consumer = session.CreateConsumer(SessionUtil.GetDestination(session, InQueue));
            var producer = sessionTwo.CreateProducer(SessionUtil.GetDestination(session, OutQueue));

            _log.Debug("Process message one and rollback");
            var transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
            var message = consumer.Receive(TimeSpan.FromSeconds(30));
            producer.Send(message);
            transaction.Dispose();

            _log.Debug("Processing message two and commit");
            transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
            message = consumer.Receive(TimeSpan.FromSeconds(30));
            producer.Send(message);
            transaction.Complete();
            transaction.Dispose();

            _log.Debug("Processing message one replay and commit");
            transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
            message = consumer.Receive(TimeSpan.FromSeconds(30));
            producer.Send(message);
            transaction.Complete();
            transaction.Dispose();

            _log.Debug("Process any repeats, there should be none");
            transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
            message = consumer.Receive(TimeSpan.FromSeconds(30));
            if (message != null)
                producer.Send(message);
            transaction.Complete();
            transaction.Dispose();

            session.Dispose();
            sessionTwo.Dispose();

            Assert.That(message, Is.Not.Null, "message was not redilivered");
            Assert.That(CountMessagesInQueue(InQueue), Is.EqualTo(0));
            Assert.That(CountMessagesInQueue(OutQueue), Is.EqualTo(2));
        }

        public static void TransactionCallback(object s, TransactionEventArgs e)
        {
            LogManager.GetCurrentClassLogger().DebugFormat("Tranasaction  {0}", e.Transaction.TransactionInformation.Status);
        }

        private int CountMessagesInQueue(string queue)
        {
            var count = 0;
            using (var session = _connection.CreateSession())
            using (var consumerIn = session.CreateConsumer(SessionUtil.GetDestination(session, queue)))
            using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew))
            {
                while (true)
                {
                    var msg = consumerIn.Receive(TimeSpan.FromSeconds(2));
                    if (msg == null)
                        break;
                    count++;
                }
            }

            return count;
        }

        private void StartService(ServiceController service)
        {
            if (service.Status != ServiceControllerStatus.Running)
                service.Start();
            service.WaitForStatus(ServiceControllerStatus.Running);
            _log.Debug("Started Broker");
        }

        private void StopService(ServiceController service)
        {
            if (service.Status != ServiceControllerStatus.Stopped)
                service.Stop();
            service.WaitForStatus(ServiceControllerStatus.Stopped);
            _log.Debug("Stopped Broker Broker");
        }

        private void SendMessageToQueue(string message)
        {
            using (var session = _connection.CreateSession())
            using (var producer = session.CreateProducer(SessionUtil.GetDestination(session, InQueue)))
            using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew))
            {
                producer.Send(producer.CreateTextMessage(message));
                scope.Complete();
            }
            _log.Debug("Primed Input Queue");
        }

        private void DeleteQueue(string queue)
        {
            using (var session = _connection.CreateSession())
            {
                SessionUtil.DeleteDestination(session, queue);
            }
        }

        [SetUp]
        public void TestSetup()
        {
            LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss");
            _log = LogManager.GetLogger(typeof(Dtc).Name);
            StartService(ActiveMqMaster);
            StopService(ActiveMqSlave);
            _factory = new NetTxConnectionFactory(ActiveMqConnectionString)
            {
                AcknowledgementMode = AcknowledgementMode.Transactional,
                RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, UseExponentialBackOff = false },
                DispatchAsync = true,
                AsyncSend = false,
                PrefetchPolicy = new PrefetchPolicy { All = 5 },
            };
            _connection = _factory.CreateConnection();
            _log.Debug("Starting connection");
            _connection.Start();
            _log.Debug("Connection established");

            DeleteQueue(InQueue);
            DeleteQueue(OutQueue);
            //Tracer.Trace = new CommonLoggingTraceAdapter();
        }

        [TearDown]
        public void TestTearDown()
        {
            _connection.Dispose();
        }

        protected ServiceController ActiveMqMaster = new ServiceController(ActiveMasterServiceName, ActiveMqMachineName);
        protected ServiceController ActiveMqSlave = new ServiceController(ActiveMqSlaveServiceName, ActiveMqMachineName);
        private static readonly string ActiveMqMachineName = ConfigurationManager.AppSettings["ActiveMqServiceMachineName"];
        private static readonly string ActiveMqConnectionString = ConfigurationManager.ConnectionStrings["ActiveMqServer"].ConnectionString;
        private static readonly string ActiveMasterServiceName = ConfigurationManager.AppSettings["ActiveMqMasterName"];
        private static readonly string ActiveMqSlaveServiceName = ConfigurationManager.AppSettings["ActiveMqSlaveName"];
        private IConnection _connection;
        private const string InQueue = "integration-test-q-in";
        private const string OutQueue = "integration-test-q-out";
        private ILog _log;
        private NetTxConnectionFactory _factory;
    }
}

  was:
Rollback when using DTC will result in a duplicate message being received.

{code:title=FailingTest|borderStyle=solid} 

using System;
using System.Configuration;
using System.ServiceProcess;
using System.Transactions;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Apache.NMS.Policies;
using Apache.NMS.Util;
using Common.Logging;
using Common.Logging.Simple;
using NUnit.Framework;

namespace IntegrationTests.ApacheNms.Jira
{
    [TestFixture]
    public class Dtc
    {
        [Test, Explicit("Bug in 1.6.2")]
        public void First_message_should_be_redilivered_and_republished_on_rollback_and_second_message_processed_as_normal()
        {
            SendMessageToQueue("1");
            SendMessageToQueue("2");
            var session = _connection.CreateSession();
            var sessionTwo = _connection.CreateSession();
            var consumer = session.CreateConsumer(SessionUtil.GetDestination(session, InQueue));
            var producer = sessionTwo.CreateProducer(SessionUtil.GetDestination(session, OutQueue));

            _log.Debug("Process message one and rollback");
            var transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
            var message = consumer.Receive(TimeSpan.FromSeconds(30));
            producer.Send(message);
            transaction.Dispose();

            _log.Debug("Processing message two and commit");
            transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
            message = consumer.Receive(TimeSpan.FromSeconds(30));
            producer.Send(message);
            transaction.Complete();
            transaction.Dispose();

            _log.Debug("Processing message one replay and commit");
            transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
            message = consumer.Receive(TimeSpan.FromSeconds(30));
            producer.Send(message);
            transaction.Complete();
            transaction.Dispose();

            _log.Debug("Process any repeats, there should be none");
            transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
            message = consumer.Receive(TimeSpan.FromSeconds(30));
            if (message != null)
                producer.Send(message);
            transaction.Complete();
            transaction.Dispose();

            session.Dispose();
            sessionTwo.Dispose();

            Assert.That(message, Is.Not.Null, "message was not redilivered");
            Assert.That(CountMessagesInQueue(InQueue), Is.EqualTo(0));
            Assert.That(CountMessagesInQueue(OutQueue), Is.EqualTo(2));
        }

        public static void TransactionCallback(object s, TransactionEventArgs e)
        {
            LogManager.GetCurrentClassLogger().DebugFormat("Tranasaction  {0}", e.Transaction.TransactionInformation.Status);
        }

        private int CountMessagesInQueue(string queue)
        {
            var count = 0;
            using (var session = _connection.CreateSession())
            using (var consumerIn = session.CreateConsumer(SessionUtil.GetDestination(session, queue)))
            using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew))
            {
                while (true)
                {
                    var msg = consumerIn.Receive(TimeSpan.FromSeconds(2));
                    if (msg == null)
                        break;
                    count++;
                }
            }

            return count;
        }

        private void StartService(ServiceController service)
        {
            if (service.Status != ServiceControllerStatus.Running)
                service.Start();
            service.WaitForStatus(ServiceControllerStatus.Running);
            _log.Debug("Started Broker");
        }

        private void StopService(ServiceController service)
        {
            if (service.Status != ServiceControllerStatus.Stopped)
                service.Stop();
            service.WaitForStatus(ServiceControllerStatus.Stopped);
            _log.Debug("Stopped Broker Broker");
        }

        private void SendMessageToQueue(string message)
        {
            using (var session = _connection.CreateSession())
            using (var producer = session.CreateProducer(SessionUtil.GetDestination(session, InQueue)))
            using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew))
            {
                producer.Send(producer.CreateTextMessage(message));
                scope.Complete();
            }
            _log.Debug("Primed Input Queue");
        }

        private void DeleteQueue(string queue)
        {
            using (var session = _connection.CreateSession())
            {
                SessionUtil.DeleteDestination(session, queue);
            }
        }

        [SetUp]
        public void TestSetup()
        {
            LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss");
            _log = LogManager.GetLogger(typeof(Dtc).Name);
            StartService(ActiveMqMaster);
            StopService(ActiveMqSlave);
            _factory = new NetTxConnectionFactory(ActiveMqConnectionString)
            {
                AcknowledgementMode = AcknowledgementMode.Transactional,
                RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, UseExponentialBackOff = false },
                DispatchAsync = true,
                AsyncSend = false,
                PrefetchPolicy = new PrefetchPolicy { All = 5 },
            };
            _connection = _factory.CreateConnection();
            _log.Debug("Starting connection");
            _connection.Start();
            _log.Debug("Connection established");

            DeleteQueue(InQueue);
            DeleteQueue(OutQueue);
            //Tracer.Trace = new CommonLoggingTraceAdapter();
        }

        [TearDown]
        public void TestTearDown()
        {
            _connection.Dispose();
        }

        protected ServiceController ActiveMqMaster = new ServiceController(ActiveMasterServiceName, ActiveMqMachineName);
        protected ServiceController ActiveMqSlave = new ServiceController(ActiveMqSlaveServiceName, ActiveMqMachineName);
        private static readonly string ActiveMqMachineName = ConfigurationManager.AppSettings["ActiveMqServiceMachineName"];
        private static readonly string ActiveMqConnectionString = ConfigurationManager.ConnectionStrings["ActiveMqServer"].ConnectionString;
        private static readonly string ActiveMasterServiceName = ConfigurationManager.AppSettings["ActiveMqMasterName"];
        private static readonly string ActiveMqSlaveServiceName = ConfigurationManager.AppSettings["ActiveMqSlaveName"];
        private IConnection _connection;
        private const string InQueue = "integration-test-q-in";
        private const string OutQueue = "integration-test-q-out";
        private ILog _log;
        private NetTxConnectionFactory _factory;
    }
}


> Synchronous DTC Consumer will experience duplicates on transaction rollback
> ---------------------------------------------------------------------------
>
>                 Key: AMQNET-472
>                 URL: https://issues.apache.org/jira/browse/AMQNET-472
>             Project: ActiveMQ .Net
>          Issue Type: Bug
>          Components: ActiveMQ
>    Affects Versions: 1.6.2
>            Reporter: Imran
>            Assignee: Jim Gomes
>
> Rollback when using DTC will result in a duplicate message being received.
> {code:title=FailingTest|borderStyle=solid} 
> using System;
> using System.Configuration;
> using System.ServiceProcess;
> using System.Transactions;
> using Apache.NMS;
> using Apache.NMS.ActiveMQ;
> using Apache.NMS.Policies;
> using Apache.NMS.Util;
> using Common.Logging;
> using Common.Logging.Simple;
> using NUnit.Framework;
> namespace IntegrationTests.ApacheNms.Jira
> {
>     [TestFixture]
>     public class Dtc
>     {
>         [Test, Explicit("Bug in 1.6.2")]
>         public void First_message_should_be_redilivered_and_republished_on_rollback_and_second_message_processed_as_normal()
>         {
>             SendMessageToQueue("1");
>             SendMessageToQueue("2");
>             var session = _connection.CreateSession();
>             var sessionTwo = _connection.CreateSession();
>             var consumer = session.CreateConsumer(SessionUtil.GetDestination(session, InQueue));
>             var producer = sessionTwo.CreateProducer(SessionUtil.GetDestination(session, OutQueue));
>             _log.Debug("Process message one and rollback");
>             var transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
>             var message = consumer.Receive(TimeSpan.FromSeconds(30));
>             producer.Send(message);
>             transaction.Dispose();
>             _log.Debug("Processing message two and commit");
>             transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
>             message = consumer.Receive(TimeSpan.FromSeconds(30));
>             producer.Send(message);
>             transaction.Complete();
>             transaction.Dispose();
>             _log.Debug("Processing message one replay and commit");
>             transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
>             message = consumer.Receive(TimeSpan.FromSeconds(30));
>             producer.Send(message);
>             transaction.Complete();
>             transaction.Dispose();
>             _log.Debug("Process any repeats, there should be none");
>             transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
>             message = consumer.Receive(TimeSpan.FromSeconds(30));
>             if (message != null)
>                 producer.Send(message);
>             transaction.Complete();
>             transaction.Dispose();
>             session.Dispose();
>             sessionTwo.Dispose();
>             Assert.That(message, Is.Not.Null, "message was not redilivered");
>             Assert.That(CountMessagesInQueue(InQueue), Is.EqualTo(0));
>             Assert.That(CountMessagesInQueue(OutQueue), Is.EqualTo(2));
>         }
>         public static void TransactionCallback(object s, TransactionEventArgs e)
>         {
>             LogManager.GetCurrentClassLogger().DebugFormat("Tranasaction  {0}", e.Transaction.TransactionInformation.Status);
>         }
>         private int CountMessagesInQueue(string queue)
>         {
>             var count = 0;
>             using (var session = _connection.CreateSession())
>             using (var consumerIn = session.CreateConsumer(SessionUtil.GetDestination(session, queue)))
>             using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew))
>             {
>                 while (true)
>                 {
>                     var msg = consumerIn.Receive(TimeSpan.FromSeconds(2));
>                     if (msg == null)
>                         break;
>                     count++;
>                 }
>             }
>             return count;
>         }
>         private void StartService(ServiceController service)
>         {
>             if (service.Status != ServiceControllerStatus.Running)
>                 service.Start();
>             service.WaitForStatus(ServiceControllerStatus.Running);
>             _log.Debug("Started Broker");
>         }
>         private void StopService(ServiceController service)
>         {
>             if (service.Status != ServiceControllerStatus.Stopped)
>                 service.Stop();
>             service.WaitForStatus(ServiceControllerStatus.Stopped);
>             _log.Debug("Stopped Broker Broker");
>         }
>         private void SendMessageToQueue(string message)
>         {
>             using (var session = _connection.CreateSession())
>             using (var producer = session.CreateProducer(SessionUtil.GetDestination(session, InQueue)))
>             using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew))
>             {
>                 producer.Send(producer.CreateTextMessage(message));
>                 scope.Complete();
>             }
>             _log.Debug("Primed Input Queue");
>         }
>         private void DeleteQueue(string queue)
>         {
>             using (var session = _connection.CreateSession())
>             {
>                 SessionUtil.DeleteDestination(session, queue);
>             }
>         }
>         [SetUp]
>         public void TestSetup()
>         {
>             LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss");
>             _log = LogManager.GetLogger(typeof(Dtc).Name);
>             StartService(ActiveMqMaster);
>             StopService(ActiveMqSlave);
>             _factory = new NetTxConnectionFactory(ActiveMqConnectionString)
>             {
>                 AcknowledgementMode = AcknowledgementMode.Transactional,
>                 RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, UseExponentialBackOff = false },
>                 DispatchAsync = true,
>                 AsyncSend = false,
>                 PrefetchPolicy = new PrefetchPolicy { All = 5 },
>             };
>             _connection = _factory.CreateConnection();
>             _log.Debug("Starting connection");
>             _connection.Start();
>             _log.Debug("Connection established");
>             DeleteQueue(InQueue);
>             DeleteQueue(OutQueue);
>             //Tracer.Trace = new CommonLoggingTraceAdapter();
>         }
>         [TearDown]
>         public void TestTearDown()
>         {
>             _connection.Dispose();
>         }
>         protected ServiceController ActiveMqMaster = new ServiceController(ActiveMasterServiceName, ActiveMqMachineName);
>         protected ServiceController ActiveMqSlave = new ServiceController(ActiveMqSlaveServiceName, ActiveMqMachineName);
>         private static readonly string ActiveMqMachineName = ConfigurationManager.AppSettings["ActiveMqServiceMachineName"];
>         private static readonly string ActiveMqConnectionString = ConfigurationManager.ConnectionStrings["ActiveMqServer"].ConnectionString;
>         private static readonly string ActiveMasterServiceName = ConfigurationManager.AppSettings["ActiveMqMasterName"];
>         private static readonly string ActiveMqSlaveServiceName = ConfigurationManager.AppSettings["ActiveMqSlaveName"];
>         private IConnection _connection;
>         private const string InQueue = "integration-test-q-in";
>         private const string OutQueue = "integration-test-q-out";
>         private ILog _log;
>         private NetTxConnectionFactory _factory;
>     }
> }



--
This message was sent by Atlassian JIRA
(v6.2#6252)