You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Imran (JIRA)" <ji...@apache.org> on 2014/03/03 11:54:25 UTC
[jira] [Updated] (AMQNET-472) Synchronous DTC Consumer will
experience duplicates on transaction rollback
[ https://issues.apache.org/jira/browse/AMQNET-472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Imran updated AMQNET-472:
-------------------------
Description:
Rollback when using DTC will result in a duplicate message being received.
{code:title=FailingTest|borderStyle=solid}
using System;
using System.Configuration;
using System.ServiceProcess;
using System.Transactions;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Apache.NMS.Policies;
using Apache.NMS.Util;
using Common.Logging;
using Common.Logging.Simple;
using NUnit.Framework;
namespace IntegrationTests.ApacheNms.Jira
{
[TestFixture]
public class Dtc
{
[Test, Explicit("Bug in 1.6.2")]
public void First_message_should_be_redilivered_and_republished_on_rollback_and_second_message_processed_as_normal()
{
SendMessageToQueue("1");
SendMessageToQueue("2");
var session = _connection.CreateSession();
var sessionTwo = _connection.CreateSession();
var consumer = session.CreateConsumer(SessionUtil.GetDestination(session, InQueue));
var producer = sessionTwo.CreateProducer(SessionUtil.GetDestination(session, OutQueue));
_log.Debug("Process message one and rollback");
var transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
var message = consumer.Receive(TimeSpan.FromSeconds(30));
producer.Send(message);
transaction.Dispose();
_log.Debug("Processing message two and commit");
transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
message = consumer.Receive(TimeSpan.FromSeconds(30));
producer.Send(message);
transaction.Complete();
transaction.Dispose();
_log.Debug("Processing message one replay and commit");
transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
message = consumer.Receive(TimeSpan.FromSeconds(30));
producer.Send(message);
transaction.Complete();
transaction.Dispose();
_log.Debug("Process any repeats, there should be none");
transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
message = consumer.Receive(TimeSpan.FromSeconds(30));
if (message != null)
producer.Send(message);
transaction.Complete();
transaction.Dispose();
session.Dispose();
sessionTwo.Dispose();
Assert.That(message, Is.Not.Null, "message was not redilivered");
Assert.That(CountMessagesInQueue(InQueue), Is.EqualTo(0));
Assert.That(CountMessagesInQueue(OutQueue), Is.EqualTo(2));
}
public static void TransactionCallback(object s, TransactionEventArgs e)
{
LogManager.GetCurrentClassLogger().DebugFormat("Tranasaction {0}", e.Transaction.TransactionInformation.Status);
}
private int CountMessagesInQueue(string queue)
{
var count = 0;
using (var session = _connection.CreateSession())
using (var consumerIn = session.CreateConsumer(SessionUtil.GetDestination(session, queue)))
using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew))
{
while (true)
{
var msg = consumerIn.Receive(TimeSpan.FromSeconds(2));
if (msg == null)
break;
count++;
}
}
return count;
}
private void StartService(ServiceController service)
{
if (service.Status != ServiceControllerStatus.Running)
service.Start();
service.WaitForStatus(ServiceControllerStatus.Running);
_log.Debug("Started Broker");
}
private void StopService(ServiceController service)
{
if (service.Status != ServiceControllerStatus.Stopped)
service.Stop();
service.WaitForStatus(ServiceControllerStatus.Stopped);
_log.Debug("Stopped Broker Broker");
}
private void SendMessageToQueue(string message)
{
using (var session = _connection.CreateSession())
using (var producer = session.CreateProducer(SessionUtil.GetDestination(session, InQueue)))
using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew))
{
producer.Send(producer.CreateTextMessage(message));
scope.Complete();
}
_log.Debug("Primed Input Queue");
}
private void DeleteQueue(string queue)
{
using (var session = _connection.CreateSession())
{
SessionUtil.DeleteDestination(session, queue);
}
}
[SetUp]
public void TestSetup()
{
LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss");
_log = LogManager.GetLogger(typeof(Dtc).Name);
StartService(ActiveMqMaster);
StopService(ActiveMqSlave);
_factory = new NetTxConnectionFactory(ActiveMqConnectionString)
{
AcknowledgementMode = AcknowledgementMode.Transactional,
RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, UseExponentialBackOff = false },
DispatchAsync = true,
AsyncSend = false,
PrefetchPolicy = new PrefetchPolicy { All = 5 },
};
_connection = _factory.CreateConnection();
_log.Debug("Starting connection");
_connection.Start();
_log.Debug("Connection established");
DeleteQueue(InQueue);
DeleteQueue(OutQueue);
//Tracer.Trace = new CommonLoggingTraceAdapter();
}
[TearDown]
public void TestTearDown()
{
_connection.Dispose();
}
protected ServiceController ActiveMqMaster = new ServiceController(ActiveMasterServiceName, ActiveMqMachineName);
protected ServiceController ActiveMqSlave = new ServiceController(ActiveMqSlaveServiceName, ActiveMqMachineName);
private static readonly string ActiveMqMachineName = ConfigurationManager.AppSettings["ActiveMqServiceMachineName"];
private static readonly string ActiveMqConnectionString = ConfigurationManager.ConnectionStrings["ActiveMqServer"].ConnectionString;
private static readonly string ActiveMasterServiceName = ConfigurationManager.AppSettings["ActiveMqMasterName"];
private static readonly string ActiveMqSlaveServiceName = ConfigurationManager.AppSettings["ActiveMqSlaveName"];
private IConnection _connection;
private const string InQueue = "integration-test-q-in";
private const string OutQueue = "integration-test-q-out";
private ILog _log;
private NetTxConnectionFactory _factory;
}
}
was:
Rollback when using DTC will result in a duplicate message being received.
{code:title=FailingTest|borderStyle=solid}
using System;
using System.Configuration;
using System.ServiceProcess;
using System.Transactions;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Apache.NMS.Policies;
using Apache.NMS.Util;
using Common.Logging;
using Common.Logging.Simple;
using NUnit.Framework;
namespace IntegrationTests.ApacheNms.Jira
{
[TestFixture]
public class Dtc
{
[Test, Explicit("Bug in 1.6.2")]
public void First_message_should_be_redilivered_and_republished_on_rollback_and_second_message_processed_as_normal()
{
SendMessageToQueue("1");
SendMessageToQueue("2");
var session = _connection.CreateSession();
var sessionTwo = _connection.CreateSession();
var consumer = session.CreateConsumer(SessionUtil.GetDestination(session, InQueue));
var producer = sessionTwo.CreateProducer(SessionUtil.GetDestination(session, OutQueue));
_log.Debug("Process message one and rollback");
var transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
var message = consumer.Receive(TimeSpan.FromSeconds(30));
producer.Send(message);
transaction.Dispose();
_log.Debug("Processing message two and commit");
transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
message = consumer.Receive(TimeSpan.FromSeconds(30));
producer.Send(message);
transaction.Complete();
transaction.Dispose();
_log.Debug("Processing message one replay and commit");
transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
message = consumer.Receive(TimeSpan.FromSeconds(30));
producer.Send(message);
transaction.Complete();
transaction.Dispose();
_log.Debug("Process any repeats, there should be none");
transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
message = consumer.Receive(TimeSpan.FromSeconds(30));
if (message != null)
producer.Send(message);
transaction.Complete();
transaction.Dispose();
session.Dispose();
sessionTwo.Dispose();
Assert.That(message, Is.Not.Null, "message was not redilivered");
Assert.That(CountMessagesInQueue(InQueue), Is.EqualTo(0));
Assert.That(CountMessagesInQueue(OutQueue), Is.EqualTo(2));
}
public static void TransactionCallback(object s, TransactionEventArgs e)
{
LogManager.GetCurrentClassLogger().DebugFormat("Tranasaction {0}", e.Transaction.TransactionInformation.Status);
}
private int CountMessagesInQueue(string queue)
{
var count = 0;
using (var session = _connection.CreateSession())
using (var consumerIn = session.CreateConsumer(SessionUtil.GetDestination(session, queue)))
using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew))
{
while (true)
{
var msg = consumerIn.Receive(TimeSpan.FromSeconds(2));
if (msg == null)
break;
count++;
}
}
return count;
}
private void StartService(ServiceController service)
{
if (service.Status != ServiceControllerStatus.Running)
service.Start();
service.WaitForStatus(ServiceControllerStatus.Running);
_log.Debug("Started Broker");
}
private void StopService(ServiceController service)
{
if (service.Status != ServiceControllerStatus.Stopped)
service.Stop();
service.WaitForStatus(ServiceControllerStatus.Stopped);
_log.Debug("Stopped Broker Broker");
}
private void SendMessageToQueue(string message)
{
using (var session = _connection.CreateSession())
using (var producer = session.CreateProducer(SessionUtil.GetDestination(session, InQueue)))
using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew))
{
producer.Send(producer.CreateTextMessage(message));
scope.Complete();
}
_log.Debug("Primed Input Queue");
}
private void DeleteQueue(string queue)
{
using (var session = _connection.CreateSession())
{
SessionUtil.DeleteDestination(session, queue);
}
}
[SetUp]
public void TestSetup()
{
LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss");
_log = LogManager.GetLogger(typeof(Dtc).Name);
StartService(ActiveMqMaster);
StopService(ActiveMqSlave);
_factory = new NetTxConnectionFactory(ActiveMqConnectionString)
{
AcknowledgementMode = AcknowledgementMode.Transactional,
RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, UseExponentialBackOff = false },
DispatchAsync = true,
AsyncSend = false,
PrefetchPolicy = new PrefetchPolicy { All = 5 },
};
_connection = _factory.CreateConnection();
_log.Debug("Starting connection");
_connection.Start();
_log.Debug("Connection established");
DeleteQueue(InQueue);
DeleteQueue(OutQueue);
//Tracer.Trace = new CommonLoggingTraceAdapter();
}
[TearDown]
public void TestTearDown()
{
_connection.Dispose();
}
protected ServiceController ActiveMqMaster = new ServiceController(ActiveMasterServiceName, ActiveMqMachineName);
protected ServiceController ActiveMqSlave = new ServiceController(ActiveMqSlaveServiceName, ActiveMqMachineName);
private static readonly string ActiveMqMachineName = ConfigurationManager.AppSettings["ActiveMqServiceMachineName"];
private static readonly string ActiveMqConnectionString = ConfigurationManager.ConnectionStrings["ActiveMqServer"].ConnectionString;
private static readonly string ActiveMasterServiceName = ConfigurationManager.AppSettings["ActiveMqMasterName"];
private static readonly string ActiveMqSlaveServiceName = ConfigurationManager.AppSettings["ActiveMqSlaveName"];
private IConnection _connection;
private const string InQueue = "integration-test-q-in";
private const string OutQueue = "integration-test-q-out";
private ILog _log;
private NetTxConnectionFactory _factory;
}
}
> Synchronous DTC Consumer will experience duplicates on transaction rollback
> ---------------------------------------------------------------------------
>
> Key: AMQNET-472
> URL: https://issues.apache.org/jira/browse/AMQNET-472
> Project: ActiveMQ .Net
> Issue Type: Bug
> Components: ActiveMQ
> Affects Versions: 1.6.2
> Reporter: Imran
> Assignee: Jim Gomes
>
> Rollback when using DTC will result in a duplicate message being received.
> {code:title=FailingTest|borderStyle=solid}
> using System;
> using System.Configuration;
> using System.ServiceProcess;
> using System.Transactions;
> using Apache.NMS;
> using Apache.NMS.ActiveMQ;
> using Apache.NMS.Policies;
> using Apache.NMS.Util;
> using Common.Logging;
> using Common.Logging.Simple;
> using NUnit.Framework;
> namespace IntegrationTests.ApacheNms.Jira
> {
> [TestFixture]
> public class Dtc
> {
> [Test, Explicit("Bug in 1.6.2")]
> public void First_message_should_be_redilivered_and_republished_on_rollback_and_second_message_processed_as_normal()
> {
> SendMessageToQueue("1");
> SendMessageToQueue("2");
> var session = _connection.CreateSession();
> var sessionTwo = _connection.CreateSession();
> var consumer = session.CreateConsumer(SessionUtil.GetDestination(session, InQueue));
> var producer = sessionTwo.CreateProducer(SessionUtil.GetDestination(session, OutQueue));
> _log.Debug("Process message one and rollback");
> var transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
> var message = consumer.Receive(TimeSpan.FromSeconds(30));
> producer.Send(message);
> transaction.Dispose();
> _log.Debug("Processing message two and commit");
> transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
> message = consumer.Receive(TimeSpan.FromSeconds(30));
> producer.Send(message);
> transaction.Complete();
> transaction.Dispose();
> _log.Debug("Processing message one replay and commit");
> transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
> message = consumer.Receive(TimeSpan.FromSeconds(30));
> producer.Send(message);
> transaction.Complete();
> transaction.Dispose();
> _log.Debug("Process any repeats, there should be none");
> transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
> message = consumer.Receive(TimeSpan.FromSeconds(30));
> if (message != null)
> producer.Send(message);
> transaction.Complete();
> transaction.Dispose();
> session.Dispose();
> sessionTwo.Dispose();
> Assert.That(message, Is.Not.Null, "message was not redilivered");
> Assert.That(CountMessagesInQueue(InQueue), Is.EqualTo(0));
> Assert.That(CountMessagesInQueue(OutQueue), Is.EqualTo(2));
> }
> public static void TransactionCallback(object s, TransactionEventArgs e)
> {
> LogManager.GetCurrentClassLogger().DebugFormat("Tranasaction {0}", e.Transaction.TransactionInformation.Status);
> }
> private int CountMessagesInQueue(string queue)
> {
> var count = 0;
> using (var session = _connection.CreateSession())
> using (var consumerIn = session.CreateConsumer(SessionUtil.GetDestination(session, queue)))
> using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew))
> {
> while (true)
> {
> var msg = consumerIn.Receive(TimeSpan.FromSeconds(2));
> if (msg == null)
> break;
> count++;
> }
> }
> return count;
> }
> private void StartService(ServiceController service)
> {
> if (service.Status != ServiceControllerStatus.Running)
> service.Start();
> service.WaitForStatus(ServiceControllerStatus.Running);
> _log.Debug("Started Broker");
> }
> private void StopService(ServiceController service)
> {
> if (service.Status != ServiceControllerStatus.Stopped)
> service.Stop();
> service.WaitForStatus(ServiceControllerStatus.Stopped);
> _log.Debug("Stopped Broker Broker");
> }
> private void SendMessageToQueue(string message)
> {
> using (var session = _connection.CreateSession())
> using (var producer = session.CreateProducer(SessionUtil.GetDestination(session, InQueue)))
> using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew))
> {
> producer.Send(producer.CreateTextMessage(message));
> scope.Complete();
> }
> _log.Debug("Primed Input Queue");
> }
> private void DeleteQueue(string queue)
> {
> using (var session = _connection.CreateSession())
> {
> SessionUtil.DeleteDestination(session, queue);
> }
> }
> [SetUp]
> public void TestSetup()
> {
> LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss");
> _log = LogManager.GetLogger(typeof(Dtc).Name);
> StartService(ActiveMqMaster);
> StopService(ActiveMqSlave);
> _factory = new NetTxConnectionFactory(ActiveMqConnectionString)
> {
> AcknowledgementMode = AcknowledgementMode.Transactional,
> RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, UseExponentialBackOff = false },
> DispatchAsync = true,
> AsyncSend = false,
> PrefetchPolicy = new PrefetchPolicy { All = 5 },
> };
> _connection = _factory.CreateConnection();
> _log.Debug("Starting connection");
> _connection.Start();
> _log.Debug("Connection established");
> DeleteQueue(InQueue);
> DeleteQueue(OutQueue);
> //Tracer.Trace = new CommonLoggingTraceAdapter();
> }
> [TearDown]
> public void TestTearDown()
> {
> _connection.Dispose();
> }
> protected ServiceController ActiveMqMaster = new ServiceController(ActiveMasterServiceName, ActiveMqMachineName);
> protected ServiceController ActiveMqSlave = new ServiceController(ActiveMqSlaveServiceName, ActiveMqMachineName);
> private static readonly string ActiveMqMachineName = ConfigurationManager.AppSettings["ActiveMqServiceMachineName"];
> private static readonly string ActiveMqConnectionString = ConfigurationManager.ConnectionStrings["ActiveMqServer"].ConnectionString;
> private static readonly string ActiveMasterServiceName = ConfigurationManager.AppSettings["ActiveMqMasterName"];
> private static readonly string ActiveMqSlaveServiceName = ConfigurationManager.AppSettings["ActiveMqSlaveName"];
> private IConnection _connection;
> private const string InQueue = "integration-test-q-in";
> private const string OutQueue = "integration-test-q-out";
> private ILog _log;
> private NetTxConnectionFactory _factory;
> }
> }
--
This message was sent by Atlassian JIRA
(v6.2#6252)