You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2019/02/26 12:27:17 UTC

[activemq] branch master updated: AMQ-5790 - rework the fix from AMQ-4486 to tie the inbound xaResource connection lifectyle to the rar start/stop

This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/master by this push:
     new 5bd2abf  AMQ-5790 - rework the fix from AMQ-4486 to tie the inbound xaResource connection lifectyle to the rar start/stop
5bd2abf is described below

commit 5bd2abf85dbda14bf41f54d09af4866e814f931f
Author: gtully <ga...@gmail.com>
AuthorDate: Tue Feb 26 12:26:18 2019 +0000

    AMQ-5790 - rework the fix from AMQ-4486 to tie the inbound xaResource connection lifectyle to the rar start/stop
---
 .../activemq/ra/ActiveMQConnectionRequestInfo.java |   2 +-
 .../activemq/ra/ActiveMQResourceAdapter.java       | 158 ++++-----------------
 ...ctiveMQConnectionExecutorThreadCleanUpTest.java |   3 +-
 .../activemq/ra/ActiveMQConnectionFactoryTest.java |   4 +-
 4 files changed, 31 insertions(+), 136 deletions(-)

diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionRequestInfo.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionRequestInfo.java
index 7ed3f26..5086b41 100644
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionRequestInfo.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionRequestInfo.java
@@ -271,7 +271,7 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser
     @Override
     public String toString() {
         return new StringBuffer("ActiveMQConnectionRequestInfo{ userName = '").append(userName).append("' ").append(", serverUrl = '").append(serverUrl)
-            .append("' ").append(", clientid = '").append(clientid).append("' ").append(", userName = '").append(userName).append("' ")
+            .append("' ").append(", clientid = '").append(clientid).append("' ")
             .append(", useSessionArgs = '").append(useSessionArgs).append("' ").append(", useInboundSession = '").append(useInboundSession).append("'  }")
             .toString();
     }
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
index b17674e..8d7dd6f 100644
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
@@ -18,7 +18,9 @@ package org.apache.activemq.ra;
 
 import java.io.Serializable;
 import java.net.URI;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.JMSException;
 import javax.resource.NotSupportedException;
@@ -54,12 +56,13 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
     private static final long serialVersionUID = 360805587169336959L;
     private static final Logger LOG = LoggerFactory.getLogger(ActiveMQResourceAdapter.class);
     private transient final HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker> endpointWorkers = new HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker>();
-
+    private final AtomicBoolean started = new AtomicBoolean(false);
     private transient BootstrapContext bootstrapContext;
     private String brokerXmlConfig;
     private transient BrokerService broker;
     private transient Thread brokerStartThread;
     private ActiveMQConnectionFactory connectionFactory;
+    private transient TransactionContext xaRecoveryTransactionContext;
 
     /**
      *
@@ -73,6 +76,7 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
      */
     @Override
     public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException {
+        log.debug("Start: " + this.getInfo());
         this.bootstrapContext = bootstrapContext;
         if (brokerXmlConfig != null && brokerXmlConfig.trim().length() > 0) {
             brokerStartThread = new Thread("Starting ActiveMQ Broker") {
@@ -108,6 +112,7 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
                 Thread.currentThread().interrupt();
             }
         }
+        started.compareAndSet(false, true);
     }
 
     public ActiveMQConnection makeConnection() throws JMSException {
@@ -152,6 +157,8 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
      */
     @Override
     public void stop() {
+        log.debug("Stop: " + this.getInfo());
+        started.compareAndSet(true, false);
         synchronized (endpointWorkers) {
             while (endpointWorkers.size() > 0) {
                 ActiveMQEndpointActivationKey key = endpointWorkers.keySet().iterator().next();
@@ -167,9 +174,15 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
                 ServiceSupport.dispose(broker);
                 broker = null;
             }
+            if (xaRecoveryTransactionContext != null) {
+                try {
+                    xaRecoveryTransactionContext.getConnection().close();
+                } catch (Throwable ignored) {}
+            }
         }
 
         this.bootstrapContext = null;
+        this.xaRecoveryTransactionContext = null;
     }
 
     /**
@@ -249,138 +262,19 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
      */
     @Override
     public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException {
+        LOG.debug("getXAResources: activationSpecs" + (activationSpecs != null ? Arrays.asList(activationSpecs) : "[]") + ", info: " + getInfo());
+        if (!started.get()) {
+            LOG.debug("RAR[" + this.getInfo() + "] stopped or undeployed; no connection available for xa recovery");
+            return new XAResource[]{};
+        }
         try {
-            return new XAResource[]{
-                    new TransactionContext() {
-
-                        @Override
-                        public boolean isSameRM(XAResource xaresource) throws XAException {
-                            ActiveMQConnection original = null;
-                            try {
-                                original = setConnection(newConnection());
-                                boolean result = super.isSameRM(xaresource);
-                                LOG.trace("{}.recover({})={}", getConnection(), xaresource, result);
-                                return result;
-
-                            } catch (JMSException e) {
-                                LOG.trace("isSameRM({}) failed", xaresource, e);
-                                XAException xaException = new XAException(e.getMessage());
-                                throw xaException;
-                            } finally {
-                                closeConnection(original);
-                            }
-                        }
-
-                        @Override
-                        protected String getResourceManagerId() throws JMSException {
-                            ActiveMQConnection original = null;
-                            try {
-                                original = setConnection(newConnection());
-                                return super.getResourceManagerId();
-                            } finally {
-                                closeConnection(original);
-                            }
-                        }
-
-                        @Override
-                        public void commit(Xid xid, boolean onePhase) throws XAException {
-                            ActiveMQConnection original = null;
-                            try {
-                                setConnection(newConnection());
-                                super.commit(xid, onePhase);
-                                LOG.trace("{}.commit({},{})", getConnection(), xid);
-
-                            } catch (JMSException e) {
-                                LOG.trace("{}.commit({},{}) failed", getConnection(), xid, onePhase, e);
-                                throwXAException(e);
-                            } finally {
-                                closeConnection(original);
-                            }
-                        }
-
-                        @Override
-                        public void rollback(Xid xid) throws XAException {
-                            ActiveMQConnection original = null;
-                            try {
-                                original = setConnection(newConnection());
-                                super.rollback(xid);
-                                LOG.trace("{}.rollback({})", getConnection(), xid);
-
-                            } catch (JMSException e) {
-                                LOG.trace("{}.rollback({}) failed", getConnection(), xid, e);
-                                throwXAException(e);
-                            } finally {
-                               closeConnection(original);
-                            }
-                        }
-
-                        @Override
-                        public Xid[] recover(int flags) throws XAException {
-                            Xid[] result = new Xid[]{};
-                            ActiveMQConnection original = null;
-                            try {
-                                original = setConnection(newConnection());
-                                result = super.recover(flags);
-                                LOG.trace("{}.recover({})={}", getConnection(), flags, result);
-
-                            } catch (JMSException e) {
-                                LOG.trace("{}.recover({}) failed", getConnection(), flags, e);
-                                throwXAException(e);
-                            } finally {
-                                closeConnection(original);
-                            }
-                            return result;
-                        }
-
-                        @Override
-                        public void forget(Xid xid) throws XAException {
-                            ActiveMQConnection original = null;
-                            try {
-                                original = setConnection(newConnection());
-                                super.forget(xid);
-                                LOG.trace("{}.forget({})", getConnection(), xid);
-
-                            } catch (JMSException e) {
-                                LOG.trace("{}.forget({}) failed", getConnection(), xid, e);
-                                throwXAException(e);
-                            } finally {
-                                closeConnection(original);
-                            }
-                        }
-
-                        private void throwXAException(JMSException e) throws XAException {
-                            XAException xaException = new XAException(e.getMessage());
-                            xaException.errorCode = XAException.XAER_RMFAIL;
-                            throw xaException;
-                        }
-
-                        private ActiveMQConnection newConnection() throws JMSException {
-                            ActiveMQConnection connection = null;
-                            try {
-                                connection = makeConnection();
-                                connection.start();
-                            } catch (JMSException ex) {
-                                if (connection != null) {
-                                    try {
-                                        connection.close();
-                                    } catch (JMSException ignore) { }
-                                }
-                                throw ex;
-                            }
-                            return connection;
-                        }
-
-                        private void closeConnection(ActiveMQConnection original) {
-                            ActiveMQConnection connection = getConnection();
-                            if (connection != null) {
-                                try {
-                                    connection.close();
-                                } catch (JMSException ignored) {}
-                            }
-                            setConnection(original);
-                        }
-                    }};
-
+            synchronized ( this ) {
+                if (xaRecoveryTransactionContext == null) {
+                    LOG.debug("Init XAResource with: " + this.getInfo());
+                    xaRecoveryTransactionContext = new TransactionContext(makeConnection());
+                }
+            }
+            return new XAResource[]{ xaRecoveryTransactionContext };
         } catch (Exception e) {
             throw new ResourceException(e);
         }
diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionExecutorThreadCleanUpTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionExecutorThreadCleanUpTest.java
index ccb0d6d..684d101 100644
--- a/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionExecutorThreadCleanUpTest.java
+++ b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionExecutorThreadCleanUpTest.java
@@ -96,12 +96,14 @@ public class ActiveMQConnectionExecutorThreadCleanUpTest {
         // connection error.
         for (int i=0; i<10; i++) {
             LOG.debug("Iteration " + i);
+            ra.start(null);
             try {
                 XAResource[] resources = ra.getXAResources(null);
                 resources[0].recover(100);
             } catch (Exception ex) {
                 LOG.error(ex.getMessage());
             }
+            ra.stop();
             // allow some small time for thread cleanup to happen
             Thread.sleep(300);
 
@@ -111,7 +113,6 @@ public class ActiveMQConnectionExecutorThreadCleanUpTest {
                     "\" not cleared up with ActiveMQConnection.",
                 hasActiveMQConnectionExceutorThread());
         }
-        ra.stop();
     }
 
 
diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java
index b3e6349..2d11666 100644
--- a/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java
+++ b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java
@@ -17,7 +17,6 @@
 package org.apache.activemq.ra;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -101,6 +100,7 @@ public class ActiveMQConnectionFactoryTest {
     @Test(timeout = 60000)
     public void testGetXAResource() throws Exception {
         ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter();
+        ra.start(null);
         ra.setServerUrl(url);
         ra.setUserName(user);
         ra.setPassword(pwd);
@@ -114,6 +114,6 @@ public class ActiveMQConnectionFactoryTest {
         XAResource[] resource2 = ra.getXAResources(null);
         assertEquals("one resource", 1, resource2.length);
         assertTrue("isSameRM true", resources[0].isSameRM(resource2[0]));
-        assertFalse("no tthe same instance", resources[0].equals(resource2[0]));
+        assertTrue("the same instance", resources[0].equals(resource2[0]));
     }
 }