You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2020/06/11 18:39:35 UTC

[activemq] branch master updated: AMQ-7497 - support reconnect of the single RA xaResource connection

This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/master by this push:
     new ed41101  AMQ-7497 - support reconnect of the single RA xaResource connection
ed41101 is described below

commit ed4110175534f63e0fac834a63a05440d509752c
Author: gtully <ga...@gmail.com>
AuthorDate: Thu Jun 11 19:39:18 2020 +0100

    AMQ-7497 - support reconnect of the single RA xaResource connection
---
 .../org/apache/activemq/TransactionContext.java    |   4 +-
 .../activemq/ra/ActiveMQResourceAdapter.java       | 121 +++++++++++++++++++--
 .../activemq/ra/ActiveMQConnectionFactoryTest.java | 104 ++++++++++++++++++
 3 files changed, 218 insertions(+), 11 deletions(-)

diff --git a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
index fbc2fdf..9815a38 100644
--- a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
+++ b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
@@ -795,7 +795,7 @@ public class TransactionContext implements XAResource {
      * @param e JMSException to convert
      * @return XAException wrapping original exception or its message
      */
-    private XAException toXAException(JMSException e) {
+    public static XAException toXAException(JMSException e) {
         if (e.getCause() != null && e.getCause() instanceof XAException) {
             XAException original = (XAException)e.getCause();
             XAException xae = new XAException(original.getMessage());
@@ -818,7 +818,7 @@ public class TransactionContext implements XAResource {
         return xae;
     }
 
-    private int parseFromMessageOr(String message, int fallbackCode) {
+    private static int parseFromMessageOr(String message, int fallbackCode) {
         final String marker = "xaErrorCode:";
         final int index = message.lastIndexOf(marker);
         if (index > -1) {
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
index 4c3f5c1..e4edb06 100644
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
@@ -43,6 +43,8 @@ import org.apache.activemq.util.ServiceSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.activemq.TransactionContext.toXAException;
+
 /**
  * Knows how to connect to one ActiveMQ server. It can then activate endpoints
  * and deliver messages to those end points using the connection configure in
@@ -62,7 +64,7 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
     private transient BrokerService broker;
     private transient Thread brokerStartThread;
     private ActiveMQConnectionFactory connectionFactory;
-    private transient TransactionContext xaRecoveryTransactionContext;
+    private transient ReconnectingXAResource reconnectingXaResource;
 
     /**
      *
@@ -174,15 +176,13 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
                 ServiceSupport.dispose(broker);
                 broker = null;
             }
-            if (xaRecoveryTransactionContext != null) {
-                try {
-                    xaRecoveryTransactionContext.getConnection().close();
-                } catch (Throwable ignored) {}
+            if (reconnectingXaResource != null) {
+                reconnectingXaResource.stop();
             }
         }
 
         this.bootstrapContext = null;
-        this.xaRecoveryTransactionContext = null;
+        this.reconnectingXaResource = null;
     }
 
     /**
@@ -269,17 +269,120 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
         }
         try {
             synchronized ( this ) {
-                if (xaRecoveryTransactionContext == null) {
+                if (reconnectingXaResource == null) {
                     LOG.debug("Init XAResource with: " + this.getInfo());
-                    xaRecoveryTransactionContext = new TransactionContext(makeConnection());
+                    reconnectingXaResource = new ReconnectingXAResource(new TransactionContext(makeConnection()));
                 }
             }
-            return new XAResource[]{ xaRecoveryTransactionContext };
+
+            return new XAResource[]{reconnectingXaResource};
+
         } catch (Exception e) {
             throw new ResourceException(e);
         }
     }
 
+    private void ensureConnection(TransactionContext xaRecoveryTransactionContext) throws XAException {
+        final ActiveMQConnection existingConnection  = xaRecoveryTransactionContext.getConnection();
+        if (existingConnection == null || existingConnection.isTransportFailed()) {
+            try {
+                LOG.debug("reconnect XAResource with: " + this.getInfo(), existingConnection == null ? "" : existingConnection.getFirstFailureError());
+                xaRecoveryTransactionContext.setConnection(makeConnection());
+            } catch (JMSException e) {
+                throw toXAException(e);
+            } finally {
+                if (existingConnection != null) {
+                    try {
+                        existingConnection.close();
+                    } catch (Exception ignored) {
+                    }
+                }
+            }
+        }
+    }
+
+    private class ReconnectingXAResource implements XAResource {
+        protected TransactionContext delegate;
+
+        ReconnectingXAResource(TransactionContext delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public void commit(Xid xid, boolean b) throws XAException {
+            ensureConnection(delegate);
+            delegate.commit(xid, b);
+        }
+
+        @Override
+        public void end(Xid xid, int i) throws XAException {
+            ensureConnection(delegate);
+            delegate.end(xid, i);
+        }
+
+        @Override
+        public void forget(Xid xid) throws XAException {
+            ensureConnection(delegate);
+            delegate.forget(xid);
+        }
+
+        @Override
+        public int getTransactionTimeout() throws XAException {
+            ensureConnection(delegate);
+            return delegate.getTransactionTimeout();
+        }
+
+        @Override
+        public boolean isSameRM(XAResource xaResource) throws XAException {
+            if (this == xaResource) {
+                return true;
+            }
+            if (!(xaResource instanceof ReconnectingXAResource)) {
+                return false;
+            }
+
+            ensureConnection(delegate);
+            return delegate.isSameRM(((ReconnectingXAResource)xaResource).delegate);
+        }
+
+        @Override
+        public int prepare(Xid xid) throws XAException {
+            ensureConnection(delegate);
+            return delegate.prepare(xid);
+        }
+
+        @Override
+        public Xid[] recover(int i) throws XAException {
+            ensureConnection(delegate);
+            return delegate.recover(i);
+        }
+
+        @Override
+        public void rollback(Xid xid) throws XAException {
+            ensureConnection(delegate);
+            delegate.rollback(xid);
+
+        }
+
+        @Override
+        public boolean setTransactionTimeout(int i) throws XAException {
+            ensureConnection(delegate);
+            return delegate.setTransactionTimeout(i);
+        }
+
+        @Override
+        public void start(Xid xid, int i) throws XAException {
+            ensureConnection(delegate);
+            delegate.start(xid, i);
+        }
+
+        public void stop() {
+            try {
+                delegate.getConnection().close();
+            } catch (Throwable ignored) {}
+        }
+    };
+
     // ///////////////////////////////////////////////////////////////////////
     //
     // Java Bean getters and setters for this ResourceAdapter class.
diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java
index 2d11666..034e3d7 100644
--- a/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java
+++ b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.ra;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -32,6 +33,9 @@ import javax.transaction.xa.XAResource;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQTopicSubscriber;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.util.Wait;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -115,5 +119,105 @@ public class ActiveMQConnectionFactoryTest {
         assertEquals("one resource", 1, resource2.length);
         assertTrue("isSameRM true", resources[0].isSameRM(resource2[0]));
         assertTrue("the same instance", resources[0].equals(resource2[0]));
+
+        ra.stop();
     }
+
+
+    @Test
+    public void testXAResourceReconnect() throws Exception {
+
+        BrokerService brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        brokerService.addConnector("tcp://localhost:0");
+        brokerService.start();
+
+        try {
+            final TransportConnector transportConnector = brokerService.getTransportConnectors().get(0);
+
+            String failoverUrl = String.format("failover:(%s)?maxReconnectAttempts=1", transportConnector.getConnectUri());
+
+            ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter();
+            ra.start(null);
+            ra.setServerUrl(failoverUrl);
+            ra.setUserName(user);
+            ra.setPassword(pwd);
+
+            XAResource[] resources = ra.getXAResources(null);
+            assertEquals("one resource", 1, resources.length);
+
+            assertEquals("no pending transactions", 0, resources[0].recover(100).length);
+
+            transportConnector.stop();
+            assertTrue("no connections", Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return transportConnector.getConnections().isEmpty();
+                }
+            }));
+
+            try {
+                resources[0].recover(100);
+                fail("Expect error on broken connection");
+            } catch (Exception expected) {
+            }
+
+            transportConnector.start();
+
+            // should recover ok
+            assertEquals("no pending transactions", 0, resources[0].recover(100).length);
+
+        } finally {
+            brokerService.stop();
+        }
+    }
+
+    @Test
+    public void testXAResourceFailoverFailBack() throws Exception {
+
+        BrokerService brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        brokerService.addConnector("tcp://localhost:0");
+        brokerService.addConnector("tcp://localhost:0");
+        brokerService.start();
+
+        try {
+
+            final TransportConnector primary = brokerService.getTransportConnectors().get(0);
+            final TransportConnector secondary = brokerService.getTransportConnectors().get(1);
+
+            String failoverUrl = String.format("failover:(%s,%s)?maxReconnectAttempts=1&randomize=false", primary.getConnectUri(), secondary.getConnectUri());
+
+            ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter();
+            ra.start(null);
+            ra.setServerUrl(failoverUrl);
+            ra.setUserName(user);
+            ra.setPassword(pwd);
+
+            XAResource[] resources = ra.getXAResources(null);
+            assertEquals("one resource", 1, resources.length);
+
+            assertEquals("no pending transactions", 0, resources[0].recover(100).length);
+
+            primary.stop();
+
+            // should recover ok
+            assertEquals("no pending transactions", 0, resources[0].recover(100).length);
+
+            primary.start();
+
+            // should be ok
+            assertEquals("no pending transactions", 0, resources[0].recover(100).length);
+
+            secondary.stop();
+
+            // should recover ok
+            assertEquals("no pending transactions", 0, resources[0].recover(100).length);
+
+        } finally {
+            brokerService.stop();
+        }
+
+    }
+
 }