You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Timothy Bish (JIRA)" <ji...@apache.org> on 2014/08/28 16:33:09 UTC

[jira] [Updated] (AMQNET-471) Synchronous message consumer will lose a message that failed to commit whilst the broker was unavailable

     [ https://issues.apache.org/jira/browse/AMQNET-471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Timothy Bish updated AMQNET-471:
--------------------------------

    Fix Version/s: 1.7.0
                   1.6.4

> Synchronous message consumer will lose a message that failed to commit whilst the broker was unavailable
> --------------------------------------------------------------------------------------------------------
>
>                 Key: AMQNET-471
>                 URL: https://issues.apache.org/jira/browse/AMQNET-471
>             Project: ActiveMQ .Net
>          Issue Type: Bug
>          Components: ActiveMQ
>    Affects Versions: 1.6.2
>            Reporter: Imran
>            Assignee: Timothy Bish
>             Fix For: 1.6.4, 1.7.0
>
>         Attachments: TransactionContext.cs.patch
>
>
> If the broker is down then the client can not commit the current message. An exception is thrown by the library. This is the behavior you would expect.
> If you then try and rollback the transaction on the session due to the exception and resume message consumption, the rolled back message will never be redelivered.
> {code:title=Failing Test|borderStyle=solid} 
>  [TestFixture, Explicit]
>     public class BrokerRestart
>     {
>         [Test]
>         public void Message_should_be_redilivered_if_broker_is_down_and_try_commit()
>         {
>             StartService(ActiveMqMaster);
>             DeleteQueue();
>             SendMessageToQueue();
>             var session = _connection.CreateSession(AcknowledgementMode.Transactional);
>             var consumer = session.CreateConsumer(SessionUtil.GetDestination(session, InQ));
>             var message = consumer.Receive(TimeSpan.FromSeconds(30));
>             _log.Debug("Received message");
>             StopService(ActiveMqMaster);
>             _log.Debug("Commiting transaction");
>             try
>             {
>                 session.Commit();
>             }
>             catch (Exception ex)
>             {
>                 _log.ErrorFormat("Exception: {0}", ex.ToString().Substring(0, 250));
>                 try
>                 {
>                     session.Rollback();
>                 }
>                 catch (Exception einner)
>                 {
>                     _log.Debug("Rollback transaction");
>                     _log.ErrorFormat("Exception: {0}", einner.ToString().Substring(0, 250));
>                 }
>             }
>             StartService(ActiveMqMaster);
>             message = consumer.Receive(TimeSpan.FromSeconds(30));
>             Assert.That(message, Is.Not.Null, "message was not redilivered");
>         }
>         private void StartService(ServiceController service)
>         {
>             if (service.Status != ServiceControllerStatus.Running)
>                 service.Start();
>             service.WaitForStatus(ServiceControllerStatus.Running);
>             _log.Debug("Started Broker");
>         }
>         private void StopService(ServiceController service)
>         {
>             if (service.Status != ServiceControllerStatus.Stopped)
>                 service.Stop();
>             service.WaitForStatus(ServiceControllerStatus.Stopped);
>             _log.Debug("Stopped Broker Broker");
>         }
>         private void SendMessageToQueue()
>         {
>             using (var session = _connection.CreateSession())
>             using (var producer = session.CreateProducer(SessionUtil.GetDestination(session, InQ)))
>             {
>                 producer.Send(producer.CreateTextMessage(1.ToString()));
>                 session.Commit();
>             }
>             _log.Debug("Primed Input Queue");
>         }
>         private void DeleteQueue()
>         {
>             using (var session = _connection.CreateSession())
>             {
>                 SessionUtil.DeleteDestination(session, InQ);
>             }
>         }
>         [SetUp]
>         public void TestSetup()
>         {
>             LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss");
>             _log = LogManager.GetLogger(typeof (BrokerRestart).Name);
>             var factory = new ConnectionFactory(@"failover:(tcp://localhost:61616)")
>             {
>                 AcknowledgementMode = AcknowledgementMode.Transactional,
>                 RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, UseExponentialBackOff = false },
>                 AsyncSend = false,
>                 PrefetchPolicy = new PrefetchPolicy {All = 5}
>             };
>             _connection = factory.CreateConnection();
>             _connection.Start();
>             //Tracer.Trace = new CommonLoggingTraceAdapter();
>         }
>         protected ServiceController ActiveMqMaster = new ServiceController(@"ActiveMQ");
>         //protected ServiceController ActiveMqSlave = new ServiceController(@"ActiveMQSlave");
>         private IConnection _connection;
>         private const string InQ = "integration-test-q";
>         private ILog _log;
>     }
> {code}
> {code:title=Passing Test With Patch|borderStyle=solid} 
> using System;
> using System.Configuration;
> using System.ServiceProcess;
> using System.Threading;
> using System.Threading.Tasks;
> using Apache.NMS;
> using Apache.NMS.ActiveMQ;
> using Apache.NMS.Policies;
> using Apache.NMS.Util;
> using Common.Logging;
> using Common.Logging.Simple;
> using NUnit.Framework;
> namespace IntegrationTests.ApacheNms.Jira
> {
>     [TestFixture]
>     public class BrokerRestart
>     {
>         //AMQNET-471
>         [Test]
>         public void Message_should_be_redilivered_if_broker_is_down_and_try_to_commit()
>         {
>             var session = _connection.CreateSession(AcknowledgementMode.Transactional);
>             var consumer = session.CreateConsumer(SessionUtil.GetDestination(session, InQueue));
>             SendMessageToQueue();
>             consumer.Receive(TimeSpan.FromSeconds(5));
>             StopService(ActiveMqMaster);
>             var commiter = TryCommit(session);
>             StartService(ActiveMqMaster);
>             commiter.Wait();
>             var message = consumer.Receive(TimeSpan.FromSeconds(5));
>             TryCommit(session).Wait();
>             Assert.That(message, Is.Not.Null, "message was not redilivered");
>             Assert.That(CountMessagesInQueue(OutQueue), Is.EqualTo(0));
>         }
>         //Commit blocks if the broker is down with the patch for AMQNET-471
>         private Task TryCommit(ISession session)
>         {
>             var task = Task.Factory.StartNew(() =>
>             {
>                 try
>                 {
>                     session.Commit();
>                 }
>                 catch (Exception ex)
>                 {
>                     _log.ErrorFormat("Exception: {0}", ex.ToString().Substring(0, 250));
>                     try
>                     {
>                         session.Rollback();
>                     }
>                     catch (Exception einner)
>                     {
>                         _log.Debug("Rollback transaction");
>                         _log.ErrorFormat("Exception: {0}", einner.ToString().Substring(0, 250));
>                     }
>                 }
>             });
>             //Give it a chance to start.
>             Thread.Sleep(1000);
>             return task;
>         }
>         private int CountMessagesInQueue(string queue)
>         {
>             var count = 0;
>             using (var session = _connection.CreateSession(AcknowledgementMode.Transactional))
>             using (var consumerIn = session.CreateConsumer(SessionUtil.GetDestination(session, queue)))
>             {
>                 while (true)
>                 {
>                     var msg = consumerIn.Receive(TimeSpan.FromSeconds(2));
>                     if (msg == null)
>                         break;
>                     count++;
>                 }
>             }
>             return count;
>         }
>         private void StartService(ServiceController service)
>         {
>             if (service.Status != ServiceControllerStatus.Running || service.Status == ServiceControllerStatus.StartPending)
>                 service.Start();
>             service.WaitForStatus(ServiceControllerStatus.Running);
>             _log.Debug("Started Broker");
>         }
>         private void StopService(ServiceController service)
>         {
>             if (service.Status != ServiceControllerStatus.Stopped)
>                 service.Stop();
>             service.WaitForStatus(ServiceControllerStatus.Stopped);
>             _log.Debug("Stopped Broker");
>         }
>         private void SendMessageToQueue()
>         {
>             using (var session = _connection.CreateSession())
>             using (var producer = session.CreateProducer(SessionUtil.GetDestination(session, InQueue)))
>             {
>                 producer.Send(producer.CreateTextMessage(1.ToString()));
>                 session.Commit();
>             }
>             _log.Debug("Primed Input Queue");
>         }
>         private void DeleteQueue(string queue)
>         {
>             using (var session = _connection.CreateSession())
>             {
>                 SessionUtil.DeleteDestination(session, queue);
>             }
>         }
>         [SetUp]
>         public void TestSetup()
>         {
>             LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss");
>             _log = LogManager.GetLogger(typeof(BrokerRestart).Name);
>             StartService(ActiveMqMaster);
>             _factory = new ConnectionFactory(ActiveMqConnectionString)
>             {
>                 AcknowledgementMode = AcknowledgementMode.Transactional,
>                 RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, UseExponentialBackOff = false },
>                 AsyncSend = false,
>                 PrefetchPolicy = new PrefetchPolicy { All = 5 }
>             };
>             _connection = _factory.CreateConnection();
>             _log.Debug("Starting connection");
>             _connection.Start();
>             _log.Debug("Connection established");
>             DeleteQueue(InQueue);
>             DeleteQueue(OutQueue);
>             //Tracer.Trace = new CommonLoggingTraceAdapter();
>         }
>         [TearDown]
>         public void TestTearDown()
>         {
>             _connection.Dispose();
>         }
>         protected ServiceController ActiveMqMaster = new ServiceController(ActiveMasterServiceName, ActiveMqMachineName);
>         protected ServiceController ActiveMqSlave = new ServiceController(ActiveMqSlaveServiceName, ActiveMqMachineName);
>         private static readonly string ActiveMqMachineName = ConfigurationManager.AppSettings["ActiveMqServiceMachineName"];
>         private static readonly string ActiveMqConnectionString = ConfigurationManager.ConnectionStrings["ActiveMqServer"].ConnectionString;
>         private static readonly string ActiveMasterServiceName = ConfigurationManager.AppSettings["ActiveMqMasterName"];
>         private static readonly string ActiveMqSlaveServiceName = ConfigurationManager.AppSettings["ActiveMqSlaveName"];
>         private IConnection _connection;
>         private const string InQueue = "integration-test-q-in";
>         private const string OutQueue = "integration-test-q-out";
>         private ILog _log;
>         private ConnectionFactory _factory;
>     }
> }
> {code}
> {code:title=Java Client|borderStyle=solid} 
> import static org.junit.Assert.*;
> import org.junit.Test;
> import org.apache.activemq.*;
> import javax.jms.*;
> import javax.jms.Message;
> public class BrokerRestart {
> 	@Test
> 	public void test() throws Exception {
> 		String SERVICE_NAME = "ActiveMQ";
> 		String[] stop = {"cmd.exe", "/c", "sc", "stop", SERVICE_NAME};
> 		String[] start = {"cmd.exe", "/c", "sc", "start", SERVICE_NAME};
> 		
> 		Runtime.getRuntime().exec(start);
> 		Thread.sleep(2000);
> 		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)");
> 		SendMessage(connectionFactory);
> 		
>         Connection connection = connectionFactory.createConnection();
>         connection.start();
>         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
>         Destination destination = session.createQueue("TEST.FOO");
>         MessageConsumer consumer = session.createConsumer(destination);
>         Message message = consumer.receive(1000);
> 		
> 		Runtime.getRuntime().exec(stop);
> 		Thread.sleep(2000);
> 		try
> 		{
> 			System.out.println("Committing transaction");
> 			//Looks like this blocks when the broker is down. If you start the service manually here, the test will pass.
> 			session.commit();
> 			System.out.println("Committed transaction");
> 		}
> 		catch(Exception e)
> 		{
> 			try
> 			{
> 				System.out.println("Transaction commit exception: " + e.toString());
> 				session.rollback();
> 			}
> 			catch(Exception e2)
> 			{
> 				System.out.println("Transaction rollback exception: " + e2.toString());
> 			}
> 		}
> 		Runtime.getRuntime().exec(start);
> 		Thread.sleep(5000);
>         message = consumer.receive(1000);
>         session.commit();
>         consumer.close();
>         session.close();
>         connection.close();
>         
>         assertNotNull(message);
> 	}
> 	
> 	private static void SendMessage(ActiveMQConnectionFactory connectionFactory) throws Exception
> 	{
>         Connection connection = connectionFactory.createConnection();
>         connection.start();
>         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
>         Destination destination = session.createQueue("TEST.FOO");
>         MessageProducer producer = session.createProducer(destination);
>         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>         String text = "Hello world! From: " + Thread.currentThread().getName();
>         TextMessage message = session.createTextMessage(text);
>         producer.send(message);
>         session.close();
>         connection.close();
> 	}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)