You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Timothy Bish (JIRA)" <ji...@apache.org> on 2014/08/29 17:30:53 UTC
[jira] [Assigned] (AMQNET-474) DTC Consumer is forcibly closed if a
transaction is in progress and connection to the broker is interrupted
[ https://issues.apache.org/jira/browse/AMQNET-474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Timothy Bish reassigned AMQNET-474:
-----------------------------------
Assignee: Timothy Bish (was: Jim Gomes)
> DTC Consumer is forcibly closed if a transaction is in progress and connection to the broker is interrupted
> -----------------------------------------------------------------------------------------------------------
>
> Key: AMQNET-474
> URL: https://issues.apache.org/jira/browse/AMQNET-474
> Project: ActiveMQ .Net
> Issue Type: Bug
> Components: ActiveMQ
> Affects Versions: 1.6.2
> Reporter: Imran
> Assignee: Timothy Bish
> Attachments: NetTxTransactionContext.cs.patch
>
>
> DTC Consumer is forcibly closed if a transaction is in progress and the connection to the broker is interrupted. This behavior is different to non DTC consumers. This happens with a fail over connection specified which is not the correct behavior as you would expect the fail over feature to reestablish the connection on behalf of the client.
> {code}
> using System;
> using System.ServiceProcess;
> using System.Transactions;
> using Apache.NMS;
> using Apache.NMS.ActiveMQ;
> using Apache.NMS.Policies;
> using Apache.NMS.Util;
> using Common.Logging;
> using Common.Logging.Simple;
> using NUnit.Framework;
> namespace IntegrationTests.ApacheNms.Tests.Jira.DistributedTransaction
> {
> [TestFixture]
> public class BrokerRestartAndFailover
> {
> [Test, Explicit("After a broker restart the consumer is forcibly closed. This is not desirable as this behaviour is different to non dtc consumers.")]
> public void Should_rediliver_message_after_broker_restart()
> {
> SendMessageToQueue("1");
> var session = _connection.CreateSession(AcknowledgementMode.Transactional);
> var consumer = session.CreateConsumer(SessionUtil.GetDestination(session, InQueue));
> var transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
> consumer.Receive(TimeSpan.FromSeconds(1));
> StopService(ActiveMqMaster);
> StartService(ActiveMqMaster);
> transaction.Complete();
> transaction.Dispose();
> //try a few times to drain the queue
> var messageRedilivered = 0;
> for (var i = 0; i < 2; i++)
> {
> transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
> try
> {
> var message = consumer.Receive(TimeSpan.FromSeconds(1));
> transaction.Complete();
> if (message != null)
> messageRedilivered++;
> }
> catch (Exception ex)
> {
> LogManager.GetCurrentClassLogger().Error(ex);
> }
> finally
> {
> transaction.Dispose();
> }
> }
> Assert.That(CountMessagesInQueue(InQueue), Is.EqualTo(0));
> Assert.That(messageRedilivered, Is.EqualTo(1));
> }
> public int CountMessagesInQueue(string queue)
> {
> var factory = new ConnectionFactory(ConnectionString)
> {
> AcknowledgementMode = AcknowledgementMode.Transactional
> };
>
> var count = 0;
> using (var connection = factory.CreateConnection())
> using (var session = connection.CreateSession())
> using (var consumer = session.CreateConsumer(SessionUtil.GetQueue(session, queue)))
> {
> connection.Start();
> while (true)
> {
> var message = consumer.Receive(TimeSpan.FromSeconds(1));
> if (message == null)
> break;
> count++;
> }
> }
> return count;
> }
> private void DeleteQueue(string queue)
> {
> using (var session = _connection.CreateSession())
> {
> SessionUtil.DeleteDestination(session, queue);
> }
> }
> private void SendMessageToQueue(string message)
> {
> using (var session = _connection.CreateSession())
> using (var producer = session.CreateProducer(SessionUtil.GetDestination(session, InQueue)))
> using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew))
> {
> producer.Send(producer.CreateTextMessage(message));
> scope.Complete();
> }
> Log.Debug("Primed Input Queue");
> }
> private void StartService(ServiceController service)
> {
> if(service.Status != ServiceControllerStatus.Running)
> service.Start();
> service.WaitForStatus(ServiceControllerStatus.Running);
> }
> private void StopService(ServiceController service)
> {
> if (service.Status != ServiceControllerStatus.Stopped)
> service.Stop();
> service.WaitForStatus(ServiceControllerStatus.Stopped);
> }
> [SetUp]
> public void TestSetup()
> {
> LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss");
> StartService(ActiveMqMaster);
> StopService(ActiveMqSlave);
> _connectionFactory = new NetTxConnectionFactory(ConnectionString)
> {
> AcknowledgementMode = AcknowledgementMode.Transactional,
> RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 10, MaximumRedeliveries = 3, BackOffMultiplier = 0, UseExponentialBackOff = false },
> DispatchAsync = true,
> AsyncSend = false,
> PrefetchPolicy = new PrefetchPolicy { All = 10 },
> };
> _connection = _connectionFactory.CreateConnection();
> _connection.ConnectionInterruptedListener += () => LogManager.GetCurrentClassLogger().Debug("Connection interrupted");
> _connection.ConnectionResumedListener += () => LogManager.GetCurrentClassLogger().Debug("Connection resumed");
> _connection.ExceptionListener += ex => LogManager.GetCurrentClassLogger().ErrorFormat("Connection exception: '{0}'", ex.ToString());
> _connection.Start();
> DeleteQueue(InQueue);
> DeleteQueue(OutQueue);
> }
> [TearDown]
> public void TestTeardown()
> {
> StartService(ActiveMqMaster);
> StopService(ActiveMqSlave);
> }
> private const string ConnectionString = @"failover:(tcp://localhost:61616)";
> protected ServiceController ActiveMqMaster = new ServiceController(@"ActiveMQ");
> protected ServiceController ActiveMqSlave = new ServiceController(@"ActiveMQSlave");
> private IConnection _connection;
> private const string InQueue = "in-q";
> private const string OutQueue = "out-q";
> private static readonly ILog Log = LogManager.GetLogger(typeof(BrokerRestartAndFailover).Name);
> private NetTxConnectionFactory _connectionFactory;
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.2#6252)