You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Imran (JIRA)" <ji...@apache.org> on 2014/03/20 15:22:42 UTC

[jira] [Commented] (AMQNET-474) DTC Consumer is forcibly closed if a transaction is in progress and connection to the broker is interrupted

    [ https://issues.apache.org/jira/browse/AMQNET-474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13941767#comment-13941767 ] 

Imran commented on AMQNET-474:
------------------------------

I have identified the code that intentionally closes the connection when a broker restart occurs. As far as I can see this isn't required. I have attached a patch for this.

> DTC Consumer is forcibly closed if a transaction is in progress and connection to the broker is interrupted
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: AMQNET-474
>                 URL: https://issues.apache.org/jira/browse/AMQNET-474
>             Project: ActiveMQ .Net
>          Issue Type: Bug
>          Components: ActiveMQ
>    Affects Versions: 1.6.2
>            Reporter: Imran
>            Assignee: Jim Gomes
>         Attachments: NetTxTransactionContext.cs.patch
>
>
> DTC Consumer is forcibly closed if a transaction is in progress and the connection to the broker is interrupted. This behavior is different to non DTC consumers. This happens with a fail over connection specified which is not the correct behavior as you would expect the fail over feature to reestablish the connection on behalf of the client.
> {code}
> using System;
> using System.ServiceProcess;
> using System.Transactions;
> using Apache.NMS;
> using Apache.NMS.ActiveMQ;
> using Apache.NMS.Policies;
> using Apache.NMS.Util;
> using Common.Logging;
> using Common.Logging.Simple;
> using NUnit.Framework;
> namespace IntegrationTests.ApacheNms.Tests.Jira.DistributedTransaction
> {
>     [TestFixture]
>     public class BrokerRestartAndFailover
>     {
>         [Test, Explicit("After a broker restart the consumer is forcibly closed. This is not desirable as this behaviour is different to non dtc consumers.")]
>         public void Should_rediliver_message_after_broker_restart()
>         {
>             SendMessageToQueue("1");
>             var session = _connection.CreateSession(AcknowledgementMode.Transactional);
>             var consumer = session.CreateConsumer(SessionUtil.GetDestination(session, InQueue));
>             var transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
>             consumer.Receive(TimeSpan.FromSeconds(1));
>             StopService(ActiveMqMaster);
>             StartService(ActiveMqMaster);
>             transaction.Complete();
>             transaction.Dispose();
>             //try a few times to drain the queue
>             var messageRedilivered = 0;
>             for (var i = 0; i < 2; i++)
>             {
>                 transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
>                 try
>                 {
>                     var message = consumer.Receive(TimeSpan.FromSeconds(1));
>                     transaction.Complete();
>                     if (message != null)
>                         messageRedilivered++;
>                 }
>                 catch (Exception ex)
>                 {
>                     LogManager.GetCurrentClassLogger().Error(ex);
>                 }
>                 finally
>                 {
>                     transaction.Dispose();
>                 }
>             }
>             Assert.That(CountMessagesInQueue(InQueue), Is.EqualTo(0));
>             Assert.That(messageRedilivered, Is.EqualTo(1));
>         }
>         public int CountMessagesInQueue(string queue)
>         {
>             var factory = new ConnectionFactory(ConnectionString)
>             {
>                 AcknowledgementMode = AcknowledgementMode.Transactional
>             };
>             
>             var count = 0;
>             using (var connection = factory.CreateConnection())
>             using (var session = connection.CreateSession())
>             using (var consumer = session.CreateConsumer(SessionUtil.GetQueue(session, queue)))
>             {
>                 connection.Start();
>                 while (true)
>                 {
>                     var message = consumer.Receive(TimeSpan.FromSeconds(1));
>                     if (message == null)
>                         break;
>                     count++;
>                 }
>             }
>             return count;
>         }
>         private void DeleteQueue(string queue)
>         {
>             using (var session = _connection.CreateSession())
>             {
>                 SessionUtil.DeleteDestination(session, queue);
>             }
>         }
>         private void SendMessageToQueue(string message)
>         {
>             using (var session = _connection.CreateSession())
>             using (var producer = session.CreateProducer(SessionUtil.GetDestination(session, InQueue)))
>             using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew))
>             {
>                 producer.Send(producer.CreateTextMessage(message));
>                 scope.Complete();
>             }
>             Log.Debug("Primed Input Queue");
>         }
>         private void StartService(ServiceController service)
>         {
>             if(service.Status != ServiceControllerStatus.Running)
>                 service.Start();
>             service.WaitForStatus(ServiceControllerStatus.Running);
>         }
>         private void StopService(ServiceController service)
>         {
>             if (service.Status != ServiceControllerStatus.Stopped)
>                 service.Stop();
>             service.WaitForStatus(ServiceControllerStatus.Stopped);
>         }
>         [SetUp]
>         public void TestSetup()
>         {
>             LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss");
>             StartService(ActiveMqMaster);
>             StopService(ActiveMqSlave);
>             _connectionFactory = new NetTxConnectionFactory(ConnectionString)
>             {
>                 AcknowledgementMode = AcknowledgementMode.Transactional,
>                 RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 10, MaximumRedeliveries = 3, BackOffMultiplier = 0, UseExponentialBackOff = false },
>                 DispatchAsync = true,
>                 AsyncSend = false,
>                 PrefetchPolicy = new PrefetchPolicy { All = 10 },
>             };
>             _connection = _connectionFactory.CreateConnection();
>             _connection.ConnectionInterruptedListener += () => LogManager.GetCurrentClassLogger().Debug("Connection interrupted");
>             _connection.ConnectionResumedListener += () => LogManager.GetCurrentClassLogger().Debug("Connection resumed");
>             _connection.ExceptionListener += ex => LogManager.GetCurrentClassLogger().ErrorFormat("Connection exception: '{0}'", ex.ToString());
>             _connection.Start();
>             DeleteQueue(InQueue);
>             DeleteQueue(OutQueue);
>         }
>         [TearDown]
>         public void TestTeardown()
>         {
>             StartService(ActiveMqMaster);
>             StopService(ActiveMqSlave);
>         }
>         private const string ConnectionString = @"failover:(tcp://localhost:61616)";
>         protected ServiceController ActiveMqMaster = new ServiceController(@"ActiveMQ");
>         protected ServiceController ActiveMqSlave = new ServiceController(@"ActiveMQSlave");
>         private IConnection _connection;
>         private const string InQueue = "in-q";
>         private const string OutQueue = "out-q";
>         private static readonly ILog Log = LogManager.GetLogger(typeof(BrokerRestartAndFailover).Name);
>         private NetTxConnectionFactory _connectionFactory;
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)