You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Jean-Baptiste Onofré (Jira)" <ji...@apache.org> on 2020/11/22 10:44:00 UTC

[jira] [Commented] (AMQ-8050) XAException when failing over in the middle of a transaction

    [ https://issues.apache.org/jira/browse/AMQ-8050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17236903#comment-17236903 ] 

Jean-Baptiste Onofré commented on AMQ-8050:
-------------------------------------------

I guess you have the same behavior with ActiveMQ 5.15.x right ?

> XAException when failing over in the middle of a transaction
> ------------------------------------------------------------
>
>                 Key: AMQ-8050
>                 URL: https://issues.apache.org/jira/browse/AMQ-8050
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: JMS client, Network of Brokers
>    Affects Versions: 5.16.0
>            Reporter: Erik Håkansson
>            Assignee: Jean-Baptiste Onofré
>            Priority: Major
>
> We have been plagued in production by growing disk usage in KahaDB on our ActiveMQs. We have found that this is caused by hanging transactions, and the only solution so far has been to restart the broker. The hanging transactions happen when we have the occasional network glitch. The networking is out of our control, and not something we can fix.
> However, we have found a workaround. Our clients are MDBs in Wildfly. If we disable failover for these, and instead let Wildfly handle creating new connections we don't see the issue.
> I have been able to reproduce the error in a unit test. When there is a connection disturbance in the middle of a transaction (on the consumer end) and the client fails over to another broker in the network; it tries to commit the transaction on the new broker. 
>  This fails with 
> {noformat}
> Transaction 'XID:[...]' has not been started. xaErrorCode:-4`{noformat}
> and the transaction ends up in a weird state on the broker.
> We are not using any replicated persistence adapters, just local kahaDB for each broker in the network.
> I'm not sure if the error is actually in the client, that can't handle failover during a transaction, or in the broker that doesn't distribute the transaction properly to the other brokers in the network.
> I'm also very open to the possibility that this is simply a configuration error on our end, but if so, I have no idea what.
> I'm adding the unit test where I have reproduced it. I happily admit that I don't know much about how transactions actually behave in reality, so I might have misconfigured them here, but we see the exact same behaviour in production code where transactions are managed by Wildfly.
> {code:java}
> package test;
> import static org.awaitility.Awaitility.await;
> import org.apache.activemq.ActiveMQMessageConsumer;
> import org.apache.activemq.ActiveMQXAConnection;
> import org.apache.activemq.ActiveMQXAConnectionFactory;
> import org.apache.activemq.broker.BrokerService;
> import org.apache.activemq.broker.TransportConnector;
> import org.apache.activemq.broker.region.policy.PolicyEntry;
> import org.apache.activemq.broker.region.policy.PolicyMap;
> import org.apache.activemq.transport.failover.FailoverTransport;
> import org.junit.Assert;
> import org.junit.Test;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import java.io.DataOutputStream;
> import java.io.IOException;
> import java.net.URI;
> import java.time.Duration;
> import java.time.Instant;
> import java.time.temporal.ChronoUnit;
> import java.util.Objects;
> import java.util.UUID;
> import java.util.concurrent.atomic.AtomicLong;
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageProducer;
> import javax.jms.Queue;
> import javax.jms.TextMessage;
> import javax.jms.XAConnection;
> import javax.jms.XASession;
> import javax.transaction.xa.XAException;
> import javax.transaction.xa.XAResource;
> import javax.transaction.xa.Xid;
> public class FailoverErrorTest {
>   private static final Logger logger = LoggerFactory.getLogger(ClusterTest.class);
>   private static BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
>     BrokerService broker = new BrokerService();
>     broker.setUseJmx(true);
>     broker.setAdvisorySupport(true);
>     TransportConnector transportConnector = new TransportConnector();
>     transportConnector.setName("openwire");
>     transportConnector.setUri(URI.create(bindAddress));
>     transportConnector.setRebalanceClusterClients(true);
>     transportConnector.setUpdateClusterClientsOnRemove(true);
>     broker.addConnector(transportConnector);
>     broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
>     PolicyMap policyMap = new PolicyMap();
>     PolicyEntry defaultEntry = new PolicyEntry();
>     policyMap.setDefaultEntry(defaultEntry);
>     broker.setDestinationPolicy(policyMap);
>     return broker;
>   }
>   private static Message getMessage(String messageId, ActiveMQMessageConsumer consumer) throws JMSException {
>     String receivedMessageId = null;
>     Instant start = Instant.now();
>     while (receivedMessageId == null || !Objects.equals(messageId, receivedMessageId)) {
>       if (Instant.now().isAfter(start.plus(5, ChronoUnit.SECONDS))) {
>         Assert.fail("timeout");
>       }
>       Message msg = consumer.receive(20000);
>       Assert.assertNotNull("Couldn't get message", msg);
>       receivedMessageId = msg.getStringProperty("my_id");
>       if (!Objects.equals(messageId, receivedMessageId)) {
>         logger.info("Got the wrong message. Looping.");
>       } else {
>         logger.info("Found message");
>         return msg;
>       }
>     }
>     return null;
>   }
>   private static Xid createXid() throws IOException {
>     final AtomicLong txGenerator = new AtomicLong(System.currentTimeMillis());
>     java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream();
>     DataOutputStream os = new DataOutputStream(baos);
>     os.writeLong(txGenerator.incrementAndGet());
>     os.close();
>     final byte[] bs = baos.toByteArray();
>     return new Xid() {
>       @Override
>       public int getFormatId() {
>         return 86;
>       }
>       @Override
>       public byte[] getGlobalTransactionId() {
>         return bs;
>       }
>       @Override
>       public byte[] getBranchQualifier() {
>         return bs;
>       }
>     };
>   }
>   @Test
>   public void failoverWithExceptionProgrammaticBrokers() throws Exception {
>     BrokerService broker1 = createBroker(true, "tcp://localhost:11001");
>     broker1.setBrokerName("broker1");
>     BrokerService broker2 = createBroker(true, "tcp://localhost:11002");
>     broker2.setBrokerName("broker2");
>     XAConnection producerConnection = null;
>     ActiveMQXAConnection consumerConnection = null;
>     try {
>       System.setProperty("org.slf4j.simpleLogger.log." + FailoverTransport.class.getName(), "DEBUG");
>       broker1.start();
>       broker2.start();
>       await().atMost(Duration.ofSeconds(10)).until(() -> broker1.isStarted() && broker2.isStarted());
>       broker1.addNetworkConnector("static:(tcp://localhost:11002)");
>       broker2.addNetworkConnector("static:(tcp://localhost:11001)");
>       broker1.getNetworkConnectors().get(0).start();
>       broker2.getNetworkConnectors().get(0).start();
>       await().atMost(Duration.ofSeconds(30))
>           .until(() -> broker1.getNetworkConnectors().get(0).isStarted() &&
>               broker2.getNetworkConnectors().get(0).isStarted());
>       String queueName = "MY_QUEUE";
>       String url = "failover:(tcp://localhost:11001,tcp://localhost:11002)";
>       ActiveMQXAConnectionFactory firstFactory = new ActiveMQXAConnectionFactory(url);
>       producerConnection = firstFactory.createXAConnection();
>       producerConnection.setClientID("PRODUCER");
>       producerConnection.start();
>       XASession producerSession = producerConnection.createXASession();
>       Queue producerDestination = producerSession.createQueue(queueName);
>       Xid xid = createXid();
>       producerSession.getXAResource().start(xid, XAResource.TMNOFLAGS);
>       String messageId = UUID.randomUUID().toString();
>       MessageProducer producer = producerSession.createProducer(producerDestination);
>       TextMessage sendMessage = producerSession.createTextMessage("Test message");
>       sendMessage.setStringProperty("my_id", messageId);
>       producer.send(sendMessage);
>       producerSession.getXAResource().end(xid, XAResource.TMSUCCESS);
>       producerSession.getXAResource().prepare(xid);
>       producerSession.getXAResource().commit(xid, false);
>       consumerConnection = (ActiveMQXAConnection) firstFactory.createXAConnection();
>       consumerConnection.setClientID("CONSUMER");
>       consumerConnection.start();
>       XASession consumerSession = consumerConnection.createXASession();
>       Queue consumerDestination = consumerSession.createQueue(queueName);
>       ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) consumerSession.createConsumer(consumerDestination);
>       Xid consumerXid = createXid();
>       consumerSession.getXAResource().start(consumerXid, XAResource.TMNOFLAGS);
>       Message message = getMessage(messageId, consumer);
>       consumerSession.getXAResource().end(consumerXid, XAResource.TMSUCCESS);
>       consumerSession.getXAResource().prepare(consumerXid);
>       logger.info("Simulating dropped connection");
>       FailoverTransport transport = consumerConnection.getTransport().narrow(FailoverTransport.class);
>       URI currentTransport = transport.getConnectedTransportURI();
>       transport.handleTransportFailure(new IOException("Fake fail"));
>       await().atMost(Duration.ofSeconds(10)).until(() -> !Objects.equals(currentTransport, transport.getConnectedTransportURI()) && transport.isConnected());
>       Assert.assertTrue(transport.isConnected());
>       Assert.assertNotEquals(currentTransport, transport.getConnectedTransportURI());
>       message.acknowledge();
>       consumerSession.getXAResource().commit(consumerXid, false);
>     } catch (XAException e) {
>       if (e.errorCode == -4) {
>         logger.info("Recreated error successfully");
>       } else {
>         logger.error("Got XAException " + e.errorCode, e);
>         Assert.fail();
>       }
>     } finally {
>       producerConnection.close();
>       consumerConnection.close();
>       broker1.getNetworkConnectors().get(0).stop();
>       broker2.getNetworkConnectors().get(0).stop();
>       await().atMost(Duration.ofSeconds(5)).until(() -> broker1.getNetworkConnectors().get(0).isStopped() && broker2.getNetworkConnectors().get(0).isStopped());
>       broker1.stop();
>       broker2.stop();
>     }
>   }
> }
> {code}
> pom.xml:
> {code:xml}
> <?xml version="1.0" encoding="UTF-8"?>
> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
>   <modelVersion>4.0.0</modelVersion>
>   <groupId>test</groupId>
>   <artifactId>cluster-test</artifactId>
>   <version>1</version>
>   <properties>
>     <!--    <version.activemq>5.11.0.redhat-630475</version.activemq>-->
>     <version.activemq>5.16.0</version.activemq>
>     <maven.compiler.target>1.8</maven.compiler.target>
>     <maven.compiler.source>1.8</maven.compiler.source>
>   </properties>
>   <dependencies>
>     <dependency>
>       <groupId>junit</groupId>
>       <artifactId>junit</artifactId>
>       <version>4.13</version>
>     </dependency>
>     <dependency>
>       <groupId>org.apache.activemq</groupId>
>       <artifactId>activemq-broker</artifactId>
>       <version>${version.activemq}</version>
>       <scope>test</scope>
>     </dependency>
>     <dependency>
>       <groupId>org.apache.activemq</groupId>
>       <artifactId>activemq-kahadb-store</artifactId>
>       <version>${version.activemq}</version>
>       <scope>test</scope>
>     </dependency>
>     <dependency>
>       <groupId>org.slf4j</groupId>
>       <artifactId>slf4j-simple</artifactId>
>       <version>1.7.30</version>
>       <scope>test</scope>
>     </dependency>
>     <dependency>
>       <groupId>org.awaitility</groupId>
>       <artifactId>awaitility</artifactId>
>       <version>4.0.3</version>
>       <scope>test</scope>
>     </dependency>
>   </dependencies>
> </project>
> {code}
> Thanks



--
This message was sent by Atlassian Jira
(v8.3.4#803005)