You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2019/02/26 12:27:17 UTC
[activemq] branch master updated: AMQ-5790 - rework the fix from
AMQ-4486 to tie the inbound xaResource connection lifectyle to the rar
start/stop
This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/master by this push:
new 5bd2abf AMQ-5790 - rework the fix from AMQ-4486 to tie the inbound xaResource connection lifectyle to the rar start/stop
5bd2abf is described below
commit 5bd2abf85dbda14bf41f54d09af4866e814f931f
Author: gtully <ga...@gmail.com>
AuthorDate: Tue Feb 26 12:26:18 2019 +0000
AMQ-5790 - rework the fix from AMQ-4486 to tie the inbound xaResource connection lifectyle to the rar start/stop
---
.../activemq/ra/ActiveMQConnectionRequestInfo.java | 2 +-
.../activemq/ra/ActiveMQResourceAdapter.java | 158 ++++-----------------
...ctiveMQConnectionExecutorThreadCleanUpTest.java | 3 +-
.../activemq/ra/ActiveMQConnectionFactoryTest.java | 4 +-
4 files changed, 31 insertions(+), 136 deletions(-)
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionRequestInfo.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionRequestInfo.java
index 7ed3f26..5086b41 100644
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionRequestInfo.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionRequestInfo.java
@@ -271,7 +271,7 @@ public class ActiveMQConnectionRequestInfo implements ConnectionRequestInfo, Ser
@Override
public String toString() {
return new StringBuffer("ActiveMQConnectionRequestInfo{ userName = '").append(userName).append("' ").append(", serverUrl = '").append(serverUrl)
- .append("' ").append(", clientid = '").append(clientid).append("' ").append(", userName = '").append(userName).append("' ")
+ .append("' ").append(", clientid = '").append(clientid).append("' ")
.append(", useSessionArgs = '").append(useSessionArgs).append("' ").append(", useInboundSession = '").append(useInboundSession).append("' }")
.toString();
}
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
index b17674e..8d7dd6f 100644
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
@@ -18,7 +18,9 @@ package org.apache.activemq.ra;
import java.io.Serializable;
import java.net.URI;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
import javax.resource.NotSupportedException;
@@ -54,12 +56,13 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
private static final long serialVersionUID = 360805587169336959L;
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQResourceAdapter.class);
private transient final HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker> endpointWorkers = new HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker>();
-
+ private final AtomicBoolean started = new AtomicBoolean(false);
private transient BootstrapContext bootstrapContext;
private String brokerXmlConfig;
private transient BrokerService broker;
private transient Thread brokerStartThread;
private ActiveMQConnectionFactory connectionFactory;
+ private transient TransactionContext xaRecoveryTransactionContext;
/**
*
@@ -73,6 +76,7 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
*/
@Override
public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException {
+ log.debug("Start: " + this.getInfo());
this.bootstrapContext = bootstrapContext;
if (brokerXmlConfig != null && brokerXmlConfig.trim().length() > 0) {
brokerStartThread = new Thread("Starting ActiveMQ Broker") {
@@ -108,6 +112,7 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
Thread.currentThread().interrupt();
}
}
+ started.compareAndSet(false, true);
}
public ActiveMQConnection makeConnection() throws JMSException {
@@ -152,6 +157,8 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
*/
@Override
public void stop() {
+ log.debug("Stop: " + this.getInfo());
+ started.compareAndSet(true, false);
synchronized (endpointWorkers) {
while (endpointWorkers.size() > 0) {
ActiveMQEndpointActivationKey key = endpointWorkers.keySet().iterator().next();
@@ -167,9 +174,15 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
ServiceSupport.dispose(broker);
broker = null;
}
+ if (xaRecoveryTransactionContext != null) {
+ try {
+ xaRecoveryTransactionContext.getConnection().close();
+ } catch (Throwable ignored) {}
+ }
}
this.bootstrapContext = null;
+ this.xaRecoveryTransactionContext = null;
}
/**
@@ -249,138 +262,19 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
*/
@Override
public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException {
+ LOG.debug("getXAResources: activationSpecs" + (activationSpecs != null ? Arrays.asList(activationSpecs) : "[]") + ", info: " + getInfo());
+ if (!started.get()) {
+ LOG.debug("RAR[" + this.getInfo() + "] stopped or undeployed; no connection available for xa recovery");
+ return new XAResource[]{};
+ }
try {
- return new XAResource[]{
- new TransactionContext() {
-
- @Override
- public boolean isSameRM(XAResource xaresource) throws XAException {
- ActiveMQConnection original = null;
- try {
- original = setConnection(newConnection());
- boolean result = super.isSameRM(xaresource);
- LOG.trace("{}.recover({})={}", getConnection(), xaresource, result);
- return result;
-
- } catch (JMSException e) {
- LOG.trace("isSameRM({}) failed", xaresource, e);
- XAException xaException = new XAException(e.getMessage());
- throw xaException;
- } finally {
- closeConnection(original);
- }
- }
-
- @Override
- protected String getResourceManagerId() throws JMSException {
- ActiveMQConnection original = null;
- try {
- original = setConnection(newConnection());
- return super.getResourceManagerId();
- } finally {
- closeConnection(original);
- }
- }
-
- @Override
- public void commit(Xid xid, boolean onePhase) throws XAException {
- ActiveMQConnection original = null;
- try {
- setConnection(newConnection());
- super.commit(xid, onePhase);
- LOG.trace("{}.commit({},{})", getConnection(), xid);
-
- } catch (JMSException e) {
- LOG.trace("{}.commit({},{}) failed", getConnection(), xid, onePhase, e);
- throwXAException(e);
- } finally {
- closeConnection(original);
- }
- }
-
- @Override
- public void rollback(Xid xid) throws XAException {
- ActiveMQConnection original = null;
- try {
- original = setConnection(newConnection());
- super.rollback(xid);
- LOG.trace("{}.rollback({})", getConnection(), xid);
-
- } catch (JMSException e) {
- LOG.trace("{}.rollback({}) failed", getConnection(), xid, e);
- throwXAException(e);
- } finally {
- closeConnection(original);
- }
- }
-
- @Override
- public Xid[] recover(int flags) throws XAException {
- Xid[] result = new Xid[]{};
- ActiveMQConnection original = null;
- try {
- original = setConnection(newConnection());
- result = super.recover(flags);
- LOG.trace("{}.recover({})={}", getConnection(), flags, result);
-
- } catch (JMSException e) {
- LOG.trace("{}.recover({}) failed", getConnection(), flags, e);
- throwXAException(e);
- } finally {
- closeConnection(original);
- }
- return result;
- }
-
- @Override
- public void forget(Xid xid) throws XAException {
- ActiveMQConnection original = null;
- try {
- original = setConnection(newConnection());
- super.forget(xid);
- LOG.trace("{}.forget({})", getConnection(), xid);
-
- } catch (JMSException e) {
- LOG.trace("{}.forget({}) failed", getConnection(), xid, e);
- throwXAException(e);
- } finally {
- closeConnection(original);
- }
- }
-
- private void throwXAException(JMSException e) throws XAException {
- XAException xaException = new XAException(e.getMessage());
- xaException.errorCode = XAException.XAER_RMFAIL;
- throw xaException;
- }
-
- private ActiveMQConnection newConnection() throws JMSException {
- ActiveMQConnection connection = null;
- try {
- connection = makeConnection();
- connection.start();
- } catch (JMSException ex) {
- if (connection != null) {
- try {
- connection.close();
- } catch (JMSException ignore) { }
- }
- throw ex;
- }
- return connection;
- }
-
- private void closeConnection(ActiveMQConnection original) {
- ActiveMQConnection connection = getConnection();
- if (connection != null) {
- try {
- connection.close();
- } catch (JMSException ignored) {}
- }
- setConnection(original);
- }
- }};
-
+ synchronized ( this ) {
+ if (xaRecoveryTransactionContext == null) {
+ LOG.debug("Init XAResource with: " + this.getInfo());
+ xaRecoveryTransactionContext = new TransactionContext(makeConnection());
+ }
+ }
+ return new XAResource[]{ xaRecoveryTransactionContext };
} catch (Exception e) {
throw new ResourceException(e);
}
diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionExecutorThreadCleanUpTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionExecutorThreadCleanUpTest.java
index ccb0d6d..684d101 100644
--- a/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionExecutorThreadCleanUpTest.java
+++ b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionExecutorThreadCleanUpTest.java
@@ -96,12 +96,14 @@ public class ActiveMQConnectionExecutorThreadCleanUpTest {
// connection error.
for (int i=0; i<10; i++) {
LOG.debug("Iteration " + i);
+ ra.start(null);
try {
XAResource[] resources = ra.getXAResources(null);
resources[0].recover(100);
} catch (Exception ex) {
LOG.error(ex.getMessage());
}
+ ra.stop();
// allow some small time for thread cleanup to happen
Thread.sleep(300);
@@ -111,7 +113,6 @@ public class ActiveMQConnectionExecutorThreadCleanUpTest {
"\" not cleared up with ActiveMQConnection.",
hasActiveMQConnectionExceutorThread());
}
- ra.stop();
}
diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java
index b3e6349..2d11666 100644
--- a/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java
+++ b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java
@@ -17,7 +17,6 @@
package org.apache.activemq.ra;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -101,6 +100,7 @@ public class ActiveMQConnectionFactoryTest {
@Test(timeout = 60000)
public void testGetXAResource() throws Exception {
ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter();
+ ra.start(null);
ra.setServerUrl(url);
ra.setUserName(user);
ra.setPassword(pwd);
@@ -114,6 +114,6 @@ public class ActiveMQConnectionFactoryTest {
XAResource[] resource2 = ra.getXAResources(null);
assertEquals("one resource", 1, resource2.length);
assertTrue("isSameRM true", resources[0].isSameRM(resource2[0]));
- assertFalse("no tthe same instance", resources[0].equals(resource2[0]));
+ assertTrue("the same instance", resources[0].equals(resource2[0]));
}
}