You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2020/06/11 18:39:35 UTC
[activemq] branch master updated: AMQ-7497 - support reconnect of
the single RA xaResource connection
This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/master by this push:
new ed41101 AMQ-7497 - support reconnect of the single RA xaResource connection
ed41101 is described below
commit ed4110175534f63e0fac834a63a05440d509752c
Author: gtully <ga...@gmail.com>
AuthorDate: Thu Jun 11 19:39:18 2020 +0100
AMQ-7497 - support reconnect of the single RA xaResource connection
---
.../org/apache/activemq/TransactionContext.java | 4 +-
.../activemq/ra/ActiveMQResourceAdapter.java | 121 +++++++++++++++++++--
.../activemq/ra/ActiveMQConnectionFactoryTest.java | 104 ++++++++++++++++++
3 files changed, 218 insertions(+), 11 deletions(-)
diff --git a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
index fbc2fdf..9815a38 100644
--- a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
+++ b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
@@ -795,7 +795,7 @@ public class TransactionContext implements XAResource {
* @param e JMSException to convert
* @return XAException wrapping original exception or its message
*/
- private XAException toXAException(JMSException e) {
+ public static XAException toXAException(JMSException e) {
if (e.getCause() != null && e.getCause() instanceof XAException) {
XAException original = (XAException)e.getCause();
XAException xae = new XAException(original.getMessage());
@@ -818,7 +818,7 @@ public class TransactionContext implements XAResource {
return xae;
}
- private int parseFromMessageOr(String message, int fallbackCode) {
+ private static int parseFromMessageOr(String message, int fallbackCode) {
final String marker = "xaErrorCode:";
final int index = message.lastIndexOf(marker);
if (index > -1) {
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
index 4c3f5c1..e4edb06 100644
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
@@ -43,6 +43,8 @@ import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.activemq.TransactionContext.toXAException;
+
/**
* Knows how to connect to one ActiveMQ server. It can then activate endpoints
* and deliver messages to those end points using the connection configure in
@@ -62,7 +64,7 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
private transient BrokerService broker;
private transient Thread brokerStartThread;
private ActiveMQConnectionFactory connectionFactory;
- private transient TransactionContext xaRecoveryTransactionContext;
+ private transient ReconnectingXAResource reconnectingXaResource;
/**
*
@@ -174,15 +176,13 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
ServiceSupport.dispose(broker);
broker = null;
}
- if (xaRecoveryTransactionContext != null) {
- try {
- xaRecoveryTransactionContext.getConnection().close();
- } catch (Throwable ignored) {}
+ if (reconnectingXaResource != null) {
+ reconnectingXaResource.stop();
}
}
this.bootstrapContext = null;
- this.xaRecoveryTransactionContext = null;
+ this.reconnectingXaResource = null;
}
/**
@@ -269,17 +269,120 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
}
try {
synchronized ( this ) {
- if (xaRecoveryTransactionContext == null) {
+ if (reconnectingXaResource == null) {
LOG.debug("Init XAResource with: " + this.getInfo());
- xaRecoveryTransactionContext = new TransactionContext(makeConnection());
+ reconnectingXaResource = new ReconnectingXAResource(new TransactionContext(makeConnection()));
}
}
- return new XAResource[]{ xaRecoveryTransactionContext };
+
+ return new XAResource[]{reconnectingXaResource};
+
} catch (Exception e) {
throw new ResourceException(e);
}
}
+ private void ensureConnection(TransactionContext xaRecoveryTransactionContext) throws XAException {
+ final ActiveMQConnection existingConnection = xaRecoveryTransactionContext.getConnection();
+ if (existingConnection == null || existingConnection.isTransportFailed()) {
+ try {
+ LOG.debug("reconnect XAResource with: " + this.getInfo(), existingConnection == null ? "" : existingConnection.getFirstFailureError());
+ xaRecoveryTransactionContext.setConnection(makeConnection());
+ } catch (JMSException e) {
+ throw toXAException(e);
+ } finally {
+ if (existingConnection != null) {
+ try {
+ existingConnection.close();
+ } catch (Exception ignored) {
+ }
+ }
+ }
+ }
+ }
+
+ private class ReconnectingXAResource implements XAResource {
+ protected TransactionContext delegate;
+
+ ReconnectingXAResource(TransactionContext delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void commit(Xid xid, boolean b) throws XAException {
+ ensureConnection(delegate);
+ delegate.commit(xid, b);
+ }
+
+ @Override
+ public void end(Xid xid, int i) throws XAException {
+ ensureConnection(delegate);
+ delegate.end(xid, i);
+ }
+
+ @Override
+ public void forget(Xid xid) throws XAException {
+ ensureConnection(delegate);
+ delegate.forget(xid);
+ }
+
+ @Override
+ public int getTransactionTimeout() throws XAException {
+ ensureConnection(delegate);
+ return delegate.getTransactionTimeout();
+ }
+
+ @Override
+ public boolean isSameRM(XAResource xaResource) throws XAException {
+ if (this == xaResource) {
+ return true;
+ }
+ if (!(xaResource instanceof ReconnectingXAResource)) {
+ return false;
+ }
+
+ ensureConnection(delegate);
+ return delegate.isSameRM(((ReconnectingXAResource)xaResource).delegate);
+ }
+
+ @Override
+ public int prepare(Xid xid) throws XAException {
+ ensureConnection(delegate);
+ return delegate.prepare(xid);
+ }
+
+ @Override
+ public Xid[] recover(int i) throws XAException {
+ ensureConnection(delegate);
+ return delegate.recover(i);
+ }
+
+ @Override
+ public void rollback(Xid xid) throws XAException {
+ ensureConnection(delegate);
+ delegate.rollback(xid);
+
+ }
+
+ @Override
+ public boolean setTransactionTimeout(int i) throws XAException {
+ ensureConnection(delegate);
+ return delegate.setTransactionTimeout(i);
+ }
+
+ @Override
+ public void start(Xid xid, int i) throws XAException {
+ ensureConnection(delegate);
+ delegate.start(xid, i);
+ }
+
+ public void stop() {
+ try {
+ delegate.getConnection().close();
+ } catch (Throwable ignored) {}
+ }
+ };
+
// ///////////////////////////////////////////////////////////////////////
//
// Java Bean getters and setters for this ResourceAdapter class.
diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java
index 2d11666..034e3d7 100644
--- a/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java
+++ b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.ra;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -32,6 +33,9 @@ import javax.transaction.xa.XAResource;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQTopicSubscriber;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.util.Wait;
import org.junit.Before;
import org.junit.Test;
@@ -115,5 +119,105 @@ public class ActiveMQConnectionFactoryTest {
assertEquals("one resource", 1, resource2.length);
assertTrue("isSameRM true", resources[0].isSameRM(resource2[0]));
assertTrue("the same instance", resources[0].equals(resource2[0]));
+
+ ra.stop();
}
+
+
+ @Test
+ public void testXAResourceReconnect() throws Exception {
+
+ BrokerService brokerService = new BrokerService();
+ brokerService.setPersistent(false);
+ brokerService.addConnector("tcp://localhost:0");
+ brokerService.start();
+
+ try {
+ final TransportConnector transportConnector = brokerService.getTransportConnectors().get(0);
+
+ String failoverUrl = String.format("failover:(%s)?maxReconnectAttempts=1", transportConnector.getConnectUri());
+
+ ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter();
+ ra.start(null);
+ ra.setServerUrl(failoverUrl);
+ ra.setUserName(user);
+ ra.setPassword(pwd);
+
+ XAResource[] resources = ra.getXAResources(null);
+ assertEquals("one resource", 1, resources.length);
+
+ assertEquals("no pending transactions", 0, resources[0].recover(100).length);
+
+ transportConnector.stop();
+ assertTrue("no connections", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return transportConnector.getConnections().isEmpty();
+ }
+ }));
+
+ try {
+ resources[0].recover(100);
+ fail("Expect error on broken connection");
+ } catch (Exception expected) {
+ }
+
+ transportConnector.start();
+
+ // should recover ok
+ assertEquals("no pending transactions", 0, resources[0].recover(100).length);
+
+ } finally {
+ brokerService.stop();
+ }
+ }
+
+ @Test
+ public void testXAResourceFailoverFailBack() throws Exception {
+
+ BrokerService brokerService = new BrokerService();
+ brokerService.setPersistent(false);
+ brokerService.addConnector("tcp://localhost:0");
+ brokerService.addConnector("tcp://localhost:0");
+ brokerService.start();
+
+ try {
+
+ final TransportConnector primary = brokerService.getTransportConnectors().get(0);
+ final TransportConnector secondary = brokerService.getTransportConnectors().get(1);
+
+ String failoverUrl = String.format("failover:(%s,%s)?maxReconnectAttempts=1&randomize=false", primary.getConnectUri(), secondary.getConnectUri());
+
+ ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter();
+ ra.start(null);
+ ra.setServerUrl(failoverUrl);
+ ra.setUserName(user);
+ ra.setPassword(pwd);
+
+ XAResource[] resources = ra.getXAResources(null);
+ assertEquals("one resource", 1, resources.length);
+
+ assertEquals("no pending transactions", 0, resources[0].recover(100).length);
+
+ primary.stop();
+
+ // should recover ok
+ assertEquals("no pending transactions", 0, resources[0].recover(100).length);
+
+ primary.start();
+
+ // should be ok
+ assertEquals("no pending transactions", 0, resources[0].recover(100).length);
+
+ secondary.stop();
+
+ // should recover ok
+ assertEquals("no pending transactions", 0, resources[0].recover(100).length);
+
+ } finally {
+ brokerService.stop();
+ }
+
+ }
+
}